Skip to content

Commit

Permalink
[DC-2692] synthetic script enhancements
Browse files Browse the repository at this point in the history
* uses cleaning rules to clean survey_conduct table data
* removes duplicated code to create cleaned survey_conduct table data
* prepares to potentially run all rules from RDR ingest to RT clean dataset
* still only runs a subset of rules marked as run_for_synthetic
  • Loading branch information
lrwb-aou committed Aug 28, 2023
1 parent 7b820f7 commit 70ed906
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 109 deletions.
3 changes: 2 additions & 1 deletion data_steward/cdr_cleaner/clean_cdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@
DataStage.SYNTHETIC.value:
RDR_CLEANING_CLASSES + COMBINED_CLEANING_CLASSES +
REGISTERED_TIER_DEID_CLEANING_CLASSES +
REGISTERED_TIER_DEID_BASE_CLEANING_CLASSES,
REGISTERED_TIER_DEID_BASE_CLEANING_CLASSES +
REGISTERED_TIER_DEID_CLEAN_CLEANING_CLASSES,
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
ON m.questionnaire_response_id = sc.survey_conduct_id
WHERE m.questionnaire_response_id IS NOT NULL
) sub
WHERE sub.survey_conduct_id = sc.survey_conduct_id
WHERE sub.survey_conduct_id = sc.survey_conduct_id
AND sc.survey_conduct_id IN (SELECT survey_conduct_id FROM `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}`)
""")

Expand Down Expand Up @@ -103,7 +103,8 @@ def __init__(self,
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_query_specs(self, *args, **keyword_args) -> query_spec_list:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def __init__(self,
dataset_id,
sandbox_dataset_id,
clean_survey_dataset_id=None,
table_namer=None):
table_namer=None,
run_for_synthetic=True):
"""
Initialize the class with proper info.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@

SANDBOX_QUERY = JINJA_ENV.from_string("""
CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}` AS (
SELECT *
SELECT *
FROM `{{project_id}}.{{dataset_id}}.survey_conduct`
WHERE survey_conduct_id NOT IN (
SELECT DISTINCT questionnaire_response_id
SELECT DISTINCT questionnaire_response_id
FROM `{{project_id}}.{{dataset_id}}.observation`
WHERE questionnaire_response_id IS NOT NULL
)
Expand All @@ -40,7 +40,7 @@
DELETE_QUERY = JINJA_ENV.from_string("""
DELETE FROM `{{project_id}}.{{dataset_id}}.survey_conduct`
WHERE survey_conduct_id IN (
SELECT DISTINCT survey_conduct_id
SELECT DISTINCT survey_conduct_id
FROM `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}`
)
""")
Expand Down Expand Up @@ -69,7 +69,8 @@ def __init__(self,
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def setup_rule(self, client):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
SELECT o.*
FROM `{{project_id}}.{{dataset_id}}.observation` o
LEFT JOIN `{{project_id}}.{{dataset_id}}.survey_conduct` sc
ON sc.survey_conduct_id = o.questionnaire_response_id
ON sc.survey_conduct_id = o.questionnaire_response_id
WHERE sc.survey_source_concept_id = 0 OR sc.survey_concept_id = 0
)
""")
Expand All @@ -53,15 +53,15 @@
CLEAN_OBSERVATION = JINJA_ENV.from_string("""
DELETE FROM `{{project_id}}.{{dataset_id}}.observation`
WHERE questionnaire_response_id IN (
SELECT questionnaire_response_id
SELECT questionnaire_response_id
FROM `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}`
)
""")

CLEAN_SURVEY_CONDUCT = JINJA_ENV.from_string("""
DELETE FROM `{{project_id}}.{{dataset_id}}.survey_conduct`
WHERE survey_conduct_id IN (
SELECT survey_conduct_id
SELECT survey_conduct_id
FROM `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}`
)
""")
Expand All @@ -72,7 +72,7 @@
COUNT(CASE WHEN questionnaire_response_id IS NOT NULL THEN 1 ELSE 0 END) as invalid_obs
FROM `{{project_id}}.{{dataset_id}}.survey_conduct` sc
LEFT JOIN `{{project_id}}.{{dataset_id}}.observation` o
ON sc.survey_conduct_id = o.questionnaire_response_id
ON sc.survey_conduct_id = o.questionnaire_response_id
WHERE sc.survey_source_concept_id = 0 OR sc.survey_concept_id = 0
""")

Expand Down Expand Up @@ -106,7 +106,8 @@ def __init__(self,
CleanSurveyConductRecurringSurveys,
UpdateSurveySourceConceptId
],
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

self.counts_query = COUNTS_QUERY.render(project_id=self.project_id,
dataset_id=self.dataset_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}` AS (
SELECT * FROM `{{project_id}}.{{dataset_id}}.survey_conduct`
WHERE survey_concept_id != survey_source_concept_id
OR survey_concept_id NOT IN (SELECT concept_id FROM `{{project_id}}.{{dataset_id}}.concept` WHERE vocabulary_id IN ('PPI','AoU_Custom','AoU_General'))
OR survey_concept_id NOT IN (SELECT concept_id FROM `{{project_id}}.{{dataset_id}}.concept` WHERE vocabulary_id IN ('PPI','AoU_Custom','AoU_General'))
)
""")

Expand Down Expand Up @@ -80,7 +80,8 @@ def __init__(self,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
depends_on=[CleanSurveyConductRecurringSurveys],
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_query_specs(self, *args, **keyword_args) -> query_spec_list:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
FROM `{{rdr_dataset_id}}.{{domain_table}}` AS t
JOIN `{{rdr_dataset_id}}._mapping_{{domain_table}}` AS v
ON t.{{domain_table}}_id = v.{{domain_table}}_id
{% if domain_table not in ['survey_conduct', 'person'] %}
{% if domain_table not in ['survey_conduct', 'person'] and 'synthetic' not in combined_sandbox_dataset_id %}
UNION ALL
SELECT DISTINCT
'{{ehr_dataset_id}}' AS src_dataset_id,
Expand All @@ -115,6 +115,8 @@
WHERE t.person_id = c.person_id)
{% endif %}
{% endif %}
-- closes the synthetic only needs if/else clause --
{% endif %}
""")

LOAD_QUERY = JINJA_ENV.from_string("""
Expand Down
101 changes: 8 additions & 93 deletions data_steward/tools/create_synthetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# Project imports
from cdr_cleaner import clean_cdr
from cdr_cleaner.args_parser import add_kwargs_to_args
from common import CDR_SCOPES, FITBIT_TABLES, ID_CONSTANT_FACTOR, JINJA_ENV
from common import CDR_SCOPES, FITBIT_TABLES, ID_CONSTANT_FACTOR, JINJA_ENV, SURVEY_CONDUCT
from constants.cdr_cleaner import clean_cdr as consts
from constants.tools import create_combined_backup_dataset as combine_consts
from gcloud.bq import BigQueryClient
Expand Down Expand Up @@ -114,91 +114,6 @@ def create_datasets(client: BigQueryClient, name: str, input_dataset: str,
return datasets


def _fix_survey_conduct_records(client, project_id, staging_ds, sandbox_ds):
# sandbox and drop orphaned records
que = JINJA_ENV.from_string("""
CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_ds}}.rdr_dc2714_rm_orphan_survey_conduct` AS (
SELECT * FROM `{{project_id}}.{{staging_ds}}.survey_conduct` sc
WHERE NOT EXISTS (
SELECT 1 FROM `{{project_id}}.{{staging_ds}}.observation` o
WHERE sc.survey_conduct_id = o.questionnaire_response_id));
DELETE FROM `{{project_id}}.{{staging_ds}}.survey_conduct` sc
WHERE EXISTS (
SELECT 1 FROM `{{project_id}}.{{sandbox_ds}}.rdr_dc2714_rm_orphan_survey_conduct` sb
WHERE sc.survey_conduct_id = sb.survey_conduct_id
);""")
que = que.render(project_id=project_id,
staging_ds=staging_ds,
sandbox_ds=sandbox_ds)

# This is now done in the pipeline.
# IF the next fix is implemented in the pipeline, this function can be removed.
# resp = client.query(que)
# resp.result()

# fix cope survey responses
que = (f"""
-- save all cope and minute survey responses --
CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_ds}}.rdr_dc2713_survey_conduct` AS (
SELECT *
FROM `{{project_id}}.{{staging_ds}}.survey_conduct`
WHERE REGEXP_CONTAINS(survey_source_value, r'(?i)(^cope$)|(^cope_)')
);
-- update cope and minute survey responses --
UPDATE `{{project_id}}.{{staging_ds}}.survey_conduct` s
SET survey_concept_id = CASE WHEN m.cope_month = 'may' THEN 2100000002
WHEN m.cope_month = 'june' THEN 2100000003
WHEN m.cope_month = 'july' THEN 2100000004
WHEN m.cope_month = 'nov' THEN 2100000005
WHEN m.cope_month = 'dec' THEN 2100000006
WHEN m.cope_month = 'feb' THEN 2100000007
WHEN m.cope_month = 'vaccine1' THEN 905047
WHEN m.cope_month = 'vaccine2' THEN 905055
WHEN m.cope_month = 'vaccine3' THEN 765936
WHEN m.cope_month = 'vaccine4' THEN 1741006
ELSE s.survey_concept_id
END,
survey_source_concept_id = CASE
WHEN m.cope_month = 'may' THEN 2100000002
WHEN m.cope_month = 'june' THEN 2100000003
WHEN m.cope_month = 'july' THEN 2100000004
WHEN m.cope_month = 'nov' THEN 2100000005
WHEN m.cope_month = 'dec' THEN 2100000006
WHEN m.cope_month = 'feb' THEN 2100000007
WHEN m.cope_month = 'vaccine1' THEN 905047
WHEN m.cope_month = 'vaccine2' THEN 905055
WHEN m.cope_month = 'vaccine3' THEN 765936
WHEN m.cope_month = 'vaccine4' THEN 1741006
ELSE s.survey_concept_id
END
FROM `{{project_id}}.{{staging_ds}}.cope_survey_semantic_version_map` m
WHERE s.survey_conduct_id = m.questionnaire_response_id;
-- save all records that will be changed --
CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_ds}}.rdr_dc2713_survey_conduct_source_value` AS (
SELECT *
FROM `{{project_id}}.{{staging_ds}}.survey_conduct` s
LEFT JOIN `{{project_id}}.{{staging_ds}}.concept` c
ON s.survey_concept_id = c.concept_id
AND survey_concept_id = 0
);
-- update the survey_source_value field --
UPDATE `{{project_id}}.{{staging_ds}}.survey_conduct` s
SET s.survey_source_value = c.concept_code
FROM `{{project_id}}.{{staging_ds}}.concept` c
WHERE s.survey_concept_id = c.concept_id
AND s.survey_concept_id > 0;
""")
que = que.render(project_id=project_id,
staging_ds=staging_ds,
sandbox_ds=sandbox_ds)
resp = client.query(que)
return resp.result()


def _create_empty_fitbit_tables(bq_client, project_id, final_dataset_name):
for table in FITBIT_TABLES:
schema_list = fields_for(table)
Expand Down Expand Up @@ -254,7 +169,7 @@ def create_tier(project_id: str, input_dataset: str, release_tag: str,
# 1. add mapping tables
# EHR consent table is not added because we are not generating
# synthetic EHR data. There will not be any to map.
for domain_table in combine_consts.DOMAIN_TABLES:
for domain_table in combine_consts.DOMAIN_TABLES + [SURVEY_CONDUCT]:
LOGGER.info(f'Mapping {domain_table}...')
generate_combined_mapping_tables(bq_client, domain_table,
datasets[consts.STAGING], '',
Expand All @@ -266,12 +181,6 @@ def create_tier(project_id: str, input_dataset: str, release_tag: str,
bq_client,
f'{project_id}.{datasets[consts.STAGING]}.{domain_table}')

LOGGER.warning(
"Is `_fix_survey_conduct_records` still needed for generating synthetic data? "
"If unnecessary, remove the function and the call line.")
_fix_survey_conduct_records(bq_client, project_id, datasets[consts.STAGING],
datasets[consts.SANDBOX])

# Run cleaning rules
cleaning_args = [
'-p', project_id, '-d', datasets[consts.STAGING], '-b',
Expand Down Expand Up @@ -378,6 +287,12 @@ def main(raw_args=None) -> dict:
datasets = create_tier(args.project_id, dataset_id, args.release_tag,
args.target_principal, **kwargs)

LOGGER.info(
f"Dataset with synthetic survey data created at `{args.project_id}.{dataset_id}`.\n"
f"Review dataset before publishing to CB dev environment.\n"
f"If changes are needed (schema, data, etc.), make them as appropriate. If changes are\n"
f"made to data only, be sure to make tickets to change the synthetic script."
)
return datasets


Expand Down

0 comments on commit 70ed906

Please sign in to comment.