diff --git a/data_steward/common.py b/data_steward/common.py index f608b87479..3e0c4e2cb7 100644 --- a/data_steward/common.py +++ b/data_steward/common.py @@ -275,6 +275,9 @@ ZIP_CODE_AGGREGATION_MAP = 'zip_code_aggregation_map' DEID_QUESTIONNAIRE_RESPONSE_MAP = '_deid_questionnaire_response_map' +OPERATIONS_ANALYTICS = 'operations_analytics' +EHR_UPLOAD_PIDS = 'ehr_upload_pids' + AIAN_LIST = 'aian_list' # Participant Summary diff --git a/data_steward/resource_files/schemas/internal/lookup_tables/hpo_site_id_mappings.json b/data_steward/resource_files/schemas/internal/lookup_tables/hpo_site_id_mappings.json new file mode 100644 index 0000000000..04b70a450e --- /dev/null +++ b/data_steward/resource_files/schemas/internal/lookup_tables/hpo_site_id_mappings.json @@ -0,0 +1,26 @@ +[ + { + "type": "string", + "name": "Org_ID", + "mode": "nullable", + "description": "Organization ID of the HPO site." + }, + { + "type": "string", + "name": "HPO_ID", + "mode": "nullable", + "description": "HPO ID of the HPO site." + }, + { + "type": "string", + "name": "Site_Name", + "mode": "nullable", + "description": "HPO site name." + }, + { + "type": "integer", + "name": "Display_Order", + "mode": "nullable", + "description": "Order in which HPO sites are shown in the report." + } +] \ No newline at end of file diff --git a/data_steward/tools/generate_ehr_upload_pids.py b/data_steward/tools/generate_ehr_upload_pids.py index a125c293e2..dc651343c6 100644 --- a/data_steward/tools/generate_ehr_upload_pids.py +++ b/data_steward/tools/generate_ehr_upload_pids.py @@ -4,8 +4,10 @@ from gcloud.bq import BigQueryClient -from constants.utils import bq as bq_consts -from common import JINJA_ENV +from constants.utils.bq import LOOKUP_TABLES_DATASET_ID, HPO_SITE_ID_MAPPINGS_TABLE_ID +from common import (CONDITION_OCCURRENCE, DRUG_EXPOSURE, EHR_UPLOAD_PIDS, + JINJA_ENV, OBSERVATION, OPERATIONS_ANALYTICS, PERSON, + PROCEDURE_OCCURRENCE, VISIT_OCCURRENCE) LOGGER = logging.getLogger(__name__) @@ -15,76 +17,88 @@ LOGGER.addHandler(handler) LOGGER.setLevel(logging.DEBUG) -EHR_UPLOAD_PIDS_BQ_SCRIPT = JINJA_ENV.from_string(''' -SELECT ARRAY_TO_STRING(ARRAY_AGG(FORMAT -("""SELECT - person_id, - current_datetime() AS report_run_time, - Org_ID as org_id, - HPO_ID as hpo_id, - Site_Name as site_name, - TIMESTAMP_MICROS(t.last_modified_time * 1000) AS latest_upload_time +pid_tables = [ + PERSON, CONDITION_OCCURRENCE, PROCEDURE_OCCURRENCE, DRUG_EXPOSURE, + OBSERVATION, VISIT_OCCURRENCE +] + +HPO_IDS_QUERY = JINJA_ENV.from_string(""" +SELECT LOWER(hpo_id) AS hpo_id FROM `{{project_id}}.{{dataset_id}}.{{table_id}}` +""") + +EHR_UPLOAD_PIDS_BQ_QUERY = JINJA_ENV.from_string(""" +CREATE OR REPLACE VIEW `{{project_id}}.{{operations_dataset_id}}.{{ehr_pids_view}}` +OPTIONS(description='A participant-level view of when ehr data was sent. NOTE: the RDR calls this view to support HealthPro (1/27/21)') +AS {% for hpo_site in hpo_sites %} +SELECT + person_id, + current_datetime() AS report_run_time, + Org_ID as org_id, + HPO_ID as hpo_id, + Site_Name as site_name, + TIMESTAMP_MICROS(t.last_modified_time * 1000) AS latest_upload_time FROM - `{{project_id}}.{{ehr_dataset_id}}.%s_person` p, - `{{project_id}}.{{lookup_dataset_id}}.{{hpo_mappings}}` m, - `{{project_id}}.{{ehr_dataset_id}}.__TABLES__` t - WHERE t.table_id = '%s_person' - AND m.HPO_ID = '%s' - AND person_id IN ( - SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.%s_person` UNION DISTINCT - SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.%s_condition_occurrence` UNION DISTINCT - SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.%s_procedure_occurrence` UNION DISTINCT - SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.%s_drug_exposure` UNION DISTINCT - SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.%s_observation` UNION DISTINCT - SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.%s_visit_occurrence`)""", - LOWER(HPO_ID), LOWER(HPO_ID), HPO_ID, LOWER(HPO_ID), LOWER(HPO_ID), LOWER(HPO_ID), - LOWER(HPO_ID), LOWER(HPO_ID), LOWER(HPO_ID))), "\\nUNION ALL \\n") as q -FROM `{{project_id}}.{{lookup_dataset_id}}.{{hpo_mappings}}` -WHERE HPO_ID NOT IN ({{excluded_sites_str}}) -''') - - -def get_excluded_hpo_ids_str(excluded_hpo_ids): - """ - Formats list of hpo_ids or None to add to bq script, adds empty hpo_id + `{{project_id}}.{{ehr_dataset_id}}.{{hpo_site}}_person` p, + `{{project_id}}.{{lookup_dataset_id}}.{{hpo_mappings}}` m, + `{{project_id}}.{{ehr_dataset_id}}.__TABLES__` t +WHERE t.table_id = '{{hpo_site}}_person' +AND LOWER(m.HPO_ID) = '{{hpo_site}}' +AND person_id IN ( + {% for pid_table in pid_tables %} + SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.{{hpo_site}}_{{pid_table}}` + {% if not loop.last %} UNION DISTINCT {% endif %} + {% endfor %} +){% if not loop.last %} UNION ALL {% endif %} +{% endfor %} +""") - :param excluded_hpo_ids: List output by args parser or None - :return: String of hpo_ids enclosed in single quotes along with empty hpo_ids - """ - if excluded_hpo_ids is None: - excluded_hpo_ids = [] - # use uppercase for all hpo_ids as is in the table - excluded_hpo_ids = [hpo_id.upper() for hpo_id in excluded_hpo_ids] - # exclude empty site since lookup table contains it - excluded_hpo_ids.append('') - excluded_hpo_ids_str = ', '.join( - [f"'{hpo_id}'" for hpo_id in excluded_hpo_ids]) - return excluded_hpo_ids_str - - -def generate_ehr_upload_pids_query(project_id, - ehr_dataset_id, - excluded_hpo_ids=None): + +def update_ehr_upload_pids_view(project_id, ehr_dataset_id, bq_client=None): """ - Generate query for all hpo_ids except specified + Update (=create or replace) ehr_upload_pids view. :param project_id: Identifies the project :param ehr_dataset_id: Identifies the ehr dataset - :param excluded_hpo_ids: List of sites - :return: Query string to use in ehr_upload_pids view + :param bq_client: BigQuery client + :return: """ - bq_client = BigQueryClient(project_id) - excluded_hpo_ids_str = get_excluded_hpo_ids_str(excluded_hpo_ids) - query = EHR_UPLOAD_PIDS_BQ_SCRIPT.render( + if not bq_client: + bq_client = BigQueryClient(project_id) + + hpo_query = HPO_IDS_QUERY.render(project_id=project_id, + dataset_id=LOOKUP_TABLES_DATASET_ID, + table_id=HPO_SITE_ID_MAPPINGS_TABLE_ID) + + response = bq_client.query(hpo_query) + result = list(response.result()) + hpo_sites = [row[0] for row in result] + + hpo_sites_with_submission = [ + hpo_id for hpo_id in hpo_sites if all( + bq_client.table_exists(f'{hpo_id}_{table}', ehr_dataset_id) + for table in pid_tables) + ] + LOGGER.info( + f'The following HPO sites will be included in the view `{project_id}.{OPERATIONS_ANALYTICS}.{EHR_UPLOAD_PIDS}`. ' + 'These sites are listed in the site mapping table and they have submitted files: ' + ) + LOGGER.info(', '.join(hpo_sites_with_submission)) + + query = EHR_UPLOAD_PIDS_BQ_QUERY.render( project_id=project_id, + operations_dataset_id=OPERATIONS_ANALYTICS, + ehr_pids_view=EHR_UPLOAD_PIDS, ehr_dataset_id=ehr_dataset_id, - lookup_dataset_id=bq_consts.LOOKUP_TABLES_DATASET_ID, - hpo_mappings=bq_consts.HPO_SITE_ID_MAPPINGS_TABLE_ID, - excluded_sites_str=excluded_hpo_ids_str) - query_job = bq_client.query(query) - res = query_job.result().to_dataframe() - full_query = res["q"].to_list()[0] - return full_query + lookup_dataset_id=LOOKUP_TABLES_DATASET_ID, + hpo_mappings=HPO_SITE_ID_MAPPINGS_TABLE_ID, + hpo_sites=hpo_sites_with_submission, + pid_tables=pid_tables) + + _ = bq_client.query(query).result() + + LOGGER.info( + "The view is updated. Ensure the view is accessible without errors: " + f"`{project_id}.{OPERATIONS_ANALYTICS}.{EHR_UPLOAD_PIDS}`") def get_args_parser(): @@ -103,19 +117,15 @@ def get_args_parser(): action='store', help='Identifies the ehr dataset', required=True) - parser.add_argument('-i', - '--excluded_hpo_ids', - action='store', - nargs='+', - dest='excluded_hpo_ids', - help='Identifies sites with no tables in ehr dataset', - required=False) return parser if __name__ == '__main__': args_parser = get_args_parser() args = args_parser.parse_args() - ehr_upload_pids_query = generate_ehr_upload_pids_query( - args.project_id, args.ehr_dataset_id, args.excluded_hpo_ids) - LOGGER.info(ehr_upload_pids_query) + + bq_client = BigQueryClient(args.project_id) + + update_ehr_upload_pids_view(args.project_id, + args.ehr_dataset_id, + bq_client=bq_client) diff --git a/tests/integration_tests/data_steward/tools/generate_ehr_upload_pids_test.py b/tests/integration_tests/data_steward/tools/generate_ehr_upload_pids_test.py index 27ae2c0afc..7ceff8669c 100644 --- a/tests/integration_tests/data_steward/tools/generate_ehr_upload_pids_test.py +++ b/tests/integration_tests/data_steward/tools/generate_ehr_upload_pids_test.py @@ -1,15 +1,78 @@ # Python imports import unittest -import os +import mock # Project imports import app_identity +from common import (EHR_UPLOAD_PIDS, JINJA_ENV, OBSERVATION, PERSON, + UNIONED_DATASET_ID) from gcloud.bq import BigQueryClient -from constants.utils.bq import LOOKUP_TABLES_DATASET_ID, HPO_SITE_ID_MAPPINGS_TABLE_ID +from constants.utils.bq import HPO_SITE_ID_MAPPINGS_TABLE_ID +from resources import fields_for from tools import generate_ehr_upload_pids as eup +INSERT_HPO_SITE_ID_MAPPINGS = JINJA_ENV.from_string(""" +INSERT INTO `{{project_id}}.{{dataset_id}}.{{table}}` +VALUES + ('Fake Org', 'FAKE', 'Fake site', 1), + ('NY Org', 'NYC', 'Site in NY', 2), + ('CHS Org', 'CHS', 'CHS site', 3), + ('Pitt Org', 'Pitt', 'Pitt site', 4) +""") + +INSERT_NYC_PERSON = JINJA_ENV.from_string(""" +INSERT INTO `{{project_id}}.{{dataset_id}}.nyc_person` ( + person_id, gender_concept_id, year_of_birth, birth_datetime, + race_concept_id, ethnicity_concept_id +) VALUES + (1, 0, 2000, '2000-01-01 01:00:00 UTC', 0, 0), + (2, 0, 2000, '2000-01-01 01:00:00 UTC', 0, 0), + (3, 0, 2000, '2000-01-01 01:00:00 UTC', 0, 0) +""") + +INSERT_NYC_OBSERVATION = JINJA_ENV.from_string(""" +INSERT INTO `{{project_id}}.{{dataset_id}}.nyc_observation` ( + observation_id, person_id, observation_concept_id, observation_date, + observation_datetime, observation_type_concept_id +) VALUES + (101, 1, 0, '2000-01-01', '2001-01-01 01:00:00 UTC', 0), + (102, 2, 0, '2000-01-01', '2001-01-01 01:00:00 UTC', 0) +""") + +INSERT_PITT_PERSON = JINJA_ENV.from_string(""" +INSERT INTO `{{project_id}}.{{dataset_id}}.pitt_person` ( + person_id, gender_concept_id, year_of_birth, birth_datetime, + race_concept_id, ethnicity_concept_id +) VALUES + (1, 0, 2000, '2000-01-01 01:00:00 UTC', 0, 0), + (4, 0, 2000, '2000-01-01 01:00:00 UTC', 0, 0), + (5, 0, 2000, '2000-01-01 01:00:00 UTC', 0, 0) +""") + +INSERT_PITT_OBSERVATION = JINJA_ENV.from_string(""" +INSERT INTO `{{project_id}}.{{dataset_id}}.pitt_observation` ( + observation_id, person_id, observation_concept_id, observation_date, + observation_datetime, observation_type_concept_id +) VALUES + (101, 1, 0, '2000-01-01', '2001-01-01 01:00:00 UTC', 0), + (104, 4, 0, '2000-01-01', '2001-01-01 01:00:00 UTC', 0) +""") + +INSERT_CHS_PERSON = JINJA_ENV.from_string(""" +INSERT INTO `{{project_id}}.{{dataset_id}}.chs_person` ( + person_id, gender_concept_id, year_of_birth, birth_datetime, + race_concept_id, ethnicity_concept_id +) VALUES + (999, 0, 2000, '2000-01-01 01:00:00 UTC', 0, 0) +""") + +ASSERT_QUERY = JINJA_ENV.from_string(""" +SELECT person_id, HPO_ID FROM `{{project_id}}.{{dataset_id}}.{{table_id}}` +""") + class GenerateEhrUploadPids(unittest.TestCase): + dataset_id = UNIONED_DATASET_ID @classmethod def setUpClass(cls): @@ -19,16 +82,95 @@ def setUpClass(cls): def setUp(self): self.project_id = app_identity.get_application_id() - self.dataset_id = os.environ.get('UNIONED_DATASET_ID') self.bq_client = BigQueryClient(self.project_id) + self.fq_table_names = [ + f'{self.project_id}.{self.dataset_id}.{HPO_SITE_ID_MAPPINGS_TABLE_ID}' + ] + [ + f'{self.project_id}.{self.dataset_id}.nyc_{table}' + for table in eup.pid_tables + ] + [ + f'{self.project_id}.{self.dataset_id}.pitt_{table}' + for table in eup.pid_tables + ] + [ + f'{self.project_id}.{self.dataset_id}.chs_{PERSON}', + f'{self.project_id}.{self.dataset_id}.chs_{OBSERVATION}' + ] + + self.bq_client.create_tables( + self.fq_table_names, + exists_ok=True, + fields=[fields_for(HPO_SITE_ID_MAPPINGS_TABLE_ID)] + + [fields_for(table) for table in eup.pid_tables] + + [fields_for(table) for table in eup.pid_tables] + + [fields_for(PERSON), fields_for(OBSERVATION)]) + + insert_queries = [ + INSERT_HPO_SITE_ID_MAPPINGS, INSERT_NYC_PERSON, + INSERT_NYC_OBSERVATION, INSERT_PITT_PERSON, INSERT_PITT_OBSERVATION, + INSERT_CHS_PERSON + ] + + for query in insert_queries: + q = query.render(project_id=self.project_id, + dataset_id=self.dataset_id, + table=HPO_SITE_ID_MAPPINGS_TABLE_ID) + _ = self.bq_client.query(q).result() + + super().setUp() + + @mock.patch("tools.generate_ehr_upload_pids.OPERATIONS_ANALYTICS", + dataset_id) + @mock.patch("tools.generate_ehr_upload_pids.LOOKUP_TABLES_DATASET_ID", + dataset_id) def test_generate_ehr_upload_pids_query(self): - hpo_query = f"SELECT hpo_id FROM `{self.project_id}.{LOOKUP_TABLES_DATASET_ID}.{HPO_SITE_ID_MAPPINGS_TABLE_ID}`" - hpo_query_job = self.bq_client.query(hpo_query) - hpo_ids = hpo_query_job.result().to_dataframe()["hpo_id"].to_list() - hpo_ids.sort() - query = eup.generate_ehr_upload_pids_query(self.project_id, - self.dataset_id) - queries = query.split("\nUNION ALL \n") - for i, query in enumerate(sorted(queries)): - self.assertIn(hpo_ids[i], query) + """ + Test case to confirm the ehr_upload_pids is updated as expected. + HPO sites in this test: + FAKE: This one has no submission. Will not be included in the view. + PITT: This one has incomplete submission. Will not be included in the view. + NYC and CHS: This one has comoplete submission. Will be included in the view. + All the person_ids from NYC and CHS are included in the view. + """ + expected = [{ + 'person_id': 1, + 'HPO_ID': 'NYC' + }, { + 'person_id': 2, + 'HPO_ID': 'NYC' + }, { + 'person_id': 3, + 'HPO_ID': 'NYC' + }, { + 'person_id': 1, + 'HPO_ID': 'Pitt' + }, { + 'person_id': 4, + 'HPO_ID': 'Pitt' + }, { + 'person_id': 5, + 'HPO_ID': 'Pitt' + }] + + eup.update_ehr_upload_pids_view(self.project_id, + self.dataset_id, + bq_client=self.bq_client) + + assert_query = ASSERT_QUERY.render(project_id=self.project_id, + dataset_id=self.dataset_id, + table_id=EHR_UPLOAD_PIDS) + assert_job = self.bq_client.query(assert_query) + actual_result = list(assert_job.result()) + actual = [dict(row.items()) for row in actual_result] + self.assertCountEqual(actual, expected) + + def tearDown(self): + for table in self.fq_table_names: + self.bq_client.delete_table(table, not_found_ok=True) + + # NOTE This is a view. delete_table() can delete views. + self.bq_client.delete_table( + f'{self.project_id}.{self.dataset_id}.{EHR_UPLOAD_PIDS}', + not_found_ok=True) + + super().tearDown() diff --git a/tests/unit_tests/data_steward/tools/generate_ehr_upload_pids_test.py b/tests/unit_tests/data_steward/tools/generate_ehr_upload_pids_test.py deleted file mode 100644 index 6c653ac54d..0000000000 --- a/tests/unit_tests/data_steward/tools/generate_ehr_upload_pids_test.py +++ /dev/null @@ -1,49 +0,0 @@ -import unittest - -from tools import generate_ehr_upload_pids as eup - - -class GenerateEhrUploadPids(unittest.TestCase): - - @classmethod - def setUpClass(cls): - print('**************************************************************') - print(cls.__name__) - print('**************************************************************') - - def setUp(self): - self.project_id = "fake_project" - self.dataset_id = "fake_dataset" - - def test_get_excluded_hpo_ids_str(self): - hpo_ids = ["hpo_id_1", "hpo_id_2"] - expected = "'HPO_ID_1', 'HPO_ID_2', ''" - actual = eup.get_excluded_hpo_ids_str(hpo_ids) - self.assertEqual(actual, expected) - - hpo_ids = None - expected = "''" - actual = eup.get_excluded_hpo_ids_str(hpo_ids) - self.assertEqual(actual, expected) - - def test_get_args_parser(self): - parser = eup.get_args_parser() - - input_args = [ - '-p', self.project_id, '-d', self.dataset_id, '-i', 'hpo_id_1', - 'hpo_id_2' - ] - expected_hpo_ids = ['hpo_id_1', 'hpo_id_2'] - args = parser.parse_args(input_args) - self.assertListEqual(args.excluded_hpo_ids, expected_hpo_ids) - - input_args = [ - '-p', self.project_id, '-d', self.dataset_id, '-i', 'hpo_id_1' - ] - expected_hpo_ids = ['hpo_id_1'] - args = parser.parse_args(input_args) - self.assertListEqual(args.excluded_hpo_ids, expected_hpo_ids) - - input_args = ['-p', self.project_id, '-d', self.dataset_id] - args = parser.parse_args(input_args) - self.assertIsNone(args.excluded_hpo_ids)