From 8fb95e88a17260d0d6727f4d5e09636b128faf47 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 23 Oct 2023 12:40:42 -0700 Subject: [PATCH] feat(sqlparser): parse create DDL statements (#9002) --- .../goldens/v2_sqlite_operator.json | 162 +++++++++++++++--- .../v2_sqlite_operator_no_dag_listener.json | 162 +++++++++++++++--- .../datahub/emitter/sql_parsing_builder.py | 9 +- .../testing/check_sql_parser_result.py | 9 + .../src/datahub/utilities/sqlglot_lineage.py | 92 ++++++++-- .../test_bigquery_create_view_with_cte.json | 8 +- ..._bigquery_from_sharded_table_wildcard.json | 4 +- .../test_bigquery_nested_subqueries.json | 4 +- ..._bigquery_sharded_table_normalization.json | 4 +- .../test_bigquery_star_with_replace.json | 6 +- .../test_bigquery_view_from_union.json | 4 +- .../goldens/test_create_table_ddl.json | 55 +++++- .../goldens/test_create_view_as_select.json | 2 +- .../test_select_from_struct_subfields.json | 2 +- .../test_select_with_full_col_name.json | 2 +- .../test_teradata_default_normalization.json | 2 + 16 files changed, 430 insertions(+), 97 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json index 1a32b38ce055d9..81d0a71b651d96 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json @@ -74,9 +74,7 @@ "downstream_task_ids": "['populate_cost_table']", "inlets": "[]", "outlets": "[]", - "datahub_sql_parser_error": "Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=create_cost_table", "name": "create_cost_table", @@ -98,7 +96,44 @@ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" ], "inputDatajobs": [], - "fineGrainedLineages": [] + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),area)" + ], + "confidenceScore": 1.0 + } + ] } } }, @@ -157,7 +192,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:56:24.632190+00:00", + "start_date": "2023-10-15 20:29:10.262813+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -172,7 +207,7 @@ "name": "sqlite_operator_create_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696056984632, + "time": 1697401750262, "actor": "urn:li:corpuser:datahub" } } @@ -221,7 +256,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056984632, + "timestampMillis": 1697401750262, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -251,9 +286,7 @@ "downstream_task_ids": "['populate_cost_table']", "inlets": "[]", "outlets": "[]", - "datahub_sql_parser_error": "Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=create_cost_table", "name": "create_cost_table", @@ -275,7 +308,80 @@ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" ], "inputDatajobs": [], - "fineGrainedLineages": [] + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),area)" + ], + "confidenceScore": 1.0 + } + ] } } }, @@ -331,7 +437,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056984947, + "timestampMillis": 1697401750651, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -447,7 +553,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:56:28.605901+00:00", + "start_date": "2023-10-15 20:29:15.013834+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -462,7 +568,7 @@ "name": "sqlite_operator_populate_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696056988605, + "time": 1697401755013, "actor": "urn:li:corpuser:datahub" } } @@ -511,7 +617,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056988605, + "timestampMillis": 1697401755013, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -621,7 +727,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056989098, + "timestampMillis": 1697401755600, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -807,7 +913,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:56:32.888165+00:00", + "start_date": "2023-10-15 20:29:20.216818+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -822,7 +928,7 @@ "name": "sqlite_operator_transform_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696056992888, + "time": 1697401760216, "actor": "urn:li:corpuser:datahub" } } @@ -895,7 +1001,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056992888, + "timestampMillis": 1697401760216, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1131,7 +1237,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056993744, + "timestampMillis": 1697401761237, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1249,7 +1355,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:56:37.745717+00:00", + "start_date": "2023-10-15 20:29:26.243934+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -1264,7 +1370,7 @@ "name": "sqlite_operator_cleanup_costs_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696056997745, + "time": 1697401766243, "actor": "urn:li:corpuser:datahub" } } @@ -1313,7 +1419,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056997745, + "timestampMillis": 1697401766243, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1425,7 +1531,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056998672, + "timestampMillis": 1697401767373, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1543,7 +1649,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:56:42.645806+00:00", + "start_date": "2023-10-15 20:29:32.075613+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -1558,7 +1664,7 @@ "name": "sqlite_operator_cleanup_processed_costs_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057002645, + "time": 1697401772075, "actor": "urn:li:corpuser:datahub" } } @@ -1607,7 +1713,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057002645, + "timestampMillis": 1697401772075, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1719,7 +1825,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057003759, + "timestampMillis": 1697401773454, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json index c082be693e30c0..96a0f02ccec17f 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json @@ -74,9 +74,7 @@ "downstream_task_ids": "['populate_cost_table']", "inlets": "[]", "outlets": "[]", - "datahub_sql_parser_error": "Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=create_cost_table", "name": "create_cost_table", @@ -98,7 +96,44 @@ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" ], "inputDatajobs": [], - "fineGrainedLineages": [] + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),area)" + ], + "confidenceScore": 1.0 + } + ] } } }, @@ -157,7 +192,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 07:00:45.832554+00:00", + "start_date": "2023-10-15 20:27:26.883178+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -172,7 +207,7 @@ "name": "sqlite_operator_create_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057245832, + "time": 1697401646883, "actor": "urn:li:corpuser:datahub" } } @@ -221,7 +256,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057245832, + "timestampMillis": 1697401646883, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -251,9 +286,7 @@ "downstream_task_ids": "['populate_cost_table']", "inlets": "[]", "outlets": "[]", - "datahub_sql_parser_error": "Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE TABLE IF NOT EXISTS costs (\\n id INTEGER PRIMARY KEY,\\n month TEXT NOT NULL,\\n total_cost REAL NOT NULL,\\n area REAL NOT NULL\\n )\\n \"}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=create_cost_table", "name": "create_cost_table", @@ -275,7 +308,80 @@ "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" ], "inputDatajobs": [], - "fineGrainedLineages": [] + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),area)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),month)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),total_cost)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD),area)" + ], + "confidenceScore": 1.0 + } + ] } } }, @@ -331,7 +437,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057246734, + "timestampMillis": 1697401647826, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -502,7 +608,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 07:00:49.653938+00:00", + "start_date": "2023-10-15 20:27:31.398799+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -517,7 +623,7 @@ "name": "sqlite_operator_populate_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057249653, + "time": 1697401651398, "actor": "urn:li:corpuser:datahub" } } @@ -566,7 +672,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057249653, + "timestampMillis": 1697401651398, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -676,7 +782,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057250831, + "timestampMillis": 1697401652651, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -917,7 +1023,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 07:00:53.989264+00:00", + "start_date": "2023-10-15 20:27:37.697995+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -932,7 +1038,7 @@ "name": "sqlite_operator_transform_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057253989, + "time": 1697401657697, "actor": "urn:li:corpuser:datahub" } } @@ -1005,7 +1111,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057253989, + "timestampMillis": 1697401657697, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1241,7 +1347,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057255628, + "timestampMillis": 1697401659496, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1414,7 +1520,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 07:01:00.421177+00:00", + "start_date": "2023-10-15 20:27:45.670215+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -1429,7 +1535,7 @@ "name": "sqlite_operator_cleanup_costs_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057260421, + "time": 1697401665670, "actor": "urn:li:corpuser:datahub" } } @@ -1478,7 +1584,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057260421, + "timestampMillis": 1697401665670, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1590,7 +1696,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057262258, + "timestampMillis": 1697401667670, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1763,7 +1869,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 07:01:05.540192+00:00", + "start_date": "2023-10-15 20:27:51.559194+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -1778,7 +1884,7 @@ "name": "sqlite_operator_cleanup_processed_costs_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057265540, + "time": 1697401671559, "actor": "urn:li:corpuser:datahub" } } @@ -1827,7 +1933,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057265540, + "timestampMillis": 1697401671559, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1939,7 +2045,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057267631, + "timestampMillis": 1697401673788, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" diff --git a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py index 071d590f270f8b..dedcfa0385f75b 100644 --- a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py +++ b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py @@ -179,15 +179,16 @@ def add_lineage( def gen_workunits(self) -> Iterable[MetadataWorkUnit]: if self.generate_lineage: - yield from self._gen_lineage_workunits() + for mcp in self._gen_lineage_mcps(): + yield mcp.as_workunit() if self.generate_usage_statistics: yield from self._gen_usage_statistics_workunits() - def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]: + def _gen_lineage_mcps(self) -> Iterable[MetadataChangeProposalWrapper]: for downstream_urn in self._lineage_map: upstreams: List[UpstreamClass] = [] fine_upstreams: List[FineGrainedLineageClass] = [] - for upstream_urn, edge in self._lineage_map[downstream_urn].items(): + for edge in self._lineage_map[downstream_urn].values(): upstreams.append(edge.gen_upstream_aspect()) fine_upstreams.extend(edge.gen_fine_grained_lineage_aspects()) @@ -201,7 +202,7 @@ def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]: ) yield MetadataChangeProposalWrapper( entityUrn=downstream_urn, aspect=upstream_lineage - ).as_workunit() + ) def _gen_usage_statistics_workunits(self) -> Iterable[MetadataWorkUnit]: yield from self._usage_aggregator.generate_workunits( diff --git a/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py b/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py index b3b1331db768bb..2b610947e9043c 100644 --- a/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py +++ b/metadata-ingestion/src/datahub/testing/check_sql_parser_result.py @@ -24,6 +24,7 @@ def assert_sql_result_with_resolver( *, expected_file: pathlib.Path, schema_resolver: SchemaResolver, + allow_table_error: bool = False, **kwargs: Any, ) -> None: # HACK: Our BigQuery source overwrites this value and doesn't undo it. @@ -36,6 +37,14 @@ def assert_sql_result_with_resolver( **kwargs, ) + if res.debug_info.table_error: + if allow_table_error: + logger.info( + f"SQL parser table error: {res.debug_info.table_error}", + exc_info=res.debug_info.table_error, + ) + else: + raise res.debug_info.table_error if res.debug_info.column_error: logger.warning( f"SQL parser column error: {res.debug_info.column_error}", diff --git a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py index c830ec8c02fd44..97121b368f5078 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py @@ -241,9 +241,9 @@ class SqlParsingResult(_ParserBaseModel): ) -def _parse_statement(sql: str, dialect: str) -> sqlglot.Expression: - statement = sqlglot.parse_one( - sql, read=dialect, error_level=sqlglot.ErrorLevel.RAISE +def _parse_statement(sql: sqlglot.exp.ExpOrStr, dialect: str) -> sqlglot.Expression: + statement: sqlglot.Expression = sqlglot.maybe_parse( + sql, dialect=dialect, error_level=sqlglot.ErrorLevel.RAISE ) return statement @@ -467,14 +467,20 @@ def _column_level_lineage( # noqa: C901 default_db: Optional[str], default_schema: Optional[str], ) -> List[_ColumnLineageInfo]: - if not isinstance( - statement, - _SupportedColumnLineageTypesTuple, + is_create_ddl = _is_create_table_ddl(statement) + if ( + not isinstance( + statement, + _SupportedColumnLineageTypesTuple, + ) + and not is_create_ddl ): raise UnsupportedStatementTypeError( f"Can only generate column-level lineage for select-like inner statements, not {type(statement)}" ) + column_lineage: List[_ColumnLineageInfo] = [] + use_case_insensitive_cols = dialect in { # Column identifiers are case-insensitive in BigQuery, so we need to # do a normalization step beforehand to make sure it's resolved correctly. @@ -580,6 +586,38 @@ def _schema_aware_fuzzy_column_resolve( ) from e logger.debug("Qualified sql %s", statement.sql(pretty=True, dialect=dialect)) + # Handle the create DDL case. + if is_create_ddl: + assert ( + output_table is not None + ), "output_table must be set for create DDL statements" + + create_schema: sqlglot.exp.Schema = statement.this + sqlglot_columns = create_schema.expressions + + for column_def in sqlglot_columns: + if not isinstance(column_def, sqlglot.exp.ColumnDef): + # Ignore things like constraints. + continue + + output_col = _schema_aware_fuzzy_column_resolve( + output_table, column_def.name + ) + output_col_type = column_def.args.get("kind") + + column_lineage.append( + _ColumnLineageInfo( + downstream=_DownstreamColumnRef( + table=output_table, + column=output_col, + column_type=output_col_type, + ), + upstreams=[], + ) + ) + + return column_lineage + # Try to figure out the types of the output columns. try: statement = sqlglot.optimizer.annotate_types.annotate_types( @@ -589,8 +627,6 @@ def _schema_aware_fuzzy_column_resolve( # This is not a fatal error, so we can continue. logger.debug("sqlglot failed to annotate types: %s", e) - column_lineage = [] - try: assert isinstance(statement, _SupportedColumnLineageTypesTuple) @@ -599,7 +635,6 @@ def _schema_aware_fuzzy_column_resolve( (select_col.alias_or_name, select_col) for select_col in statement.selects ] logger.debug("output columns: %s", [col[0] for col in output_columns]) - output_col: str for output_col, original_col_expression in output_columns: if output_col == "*": # If schema information is available, the * will be expanded to the actual columns. @@ -628,7 +663,7 @@ def _schema_aware_fuzzy_column_resolve( # Generate SELECT lineage. # Using a set here to deduplicate upstreams. - direct_col_upstreams: Set[_ColumnRef] = set() + direct_raw_col_upstreams: Set[_ColumnRef] = set() for node in lineage_node.walk(): if node.downstream: # We only want the leaf nodes. @@ -643,8 +678,9 @@ def _schema_aware_fuzzy_column_resolve( if node.subfield: normalized_col = f"{normalized_col}.{node.subfield}" - col = _schema_aware_fuzzy_column_resolve(table_ref, normalized_col) - direct_col_upstreams.add(_ColumnRef(table=table_ref, column=col)) + direct_raw_col_upstreams.add( + _ColumnRef(table=table_ref, column=normalized_col) + ) else: # This branch doesn't matter. For example, a count(*) column would go here, and # we don't get any column-level lineage for that. @@ -665,7 +701,16 @@ def _schema_aware_fuzzy_column_resolve( if original_col_expression.type: output_col_type = original_col_expression.type - if not direct_col_upstreams: + # Fuzzy resolve upstream columns. + direct_resolved_col_upstreams = { + _ColumnRef( + table=edge.table, + column=_schema_aware_fuzzy_column_resolve(edge.table, edge.column), + ) + for edge in direct_raw_col_upstreams + } + + if not direct_resolved_col_upstreams: logger.debug(f' "{output_col}" has no upstreams') column_lineage.append( _ColumnLineageInfo( @@ -674,12 +719,12 @@ def _schema_aware_fuzzy_column_resolve( column=output_col, column_type=output_col_type, ), - upstreams=sorted(direct_col_upstreams), + upstreams=sorted(direct_resolved_col_upstreams), # logic=column_logic.sql(pretty=True, dialect=dialect), ) ) - # TODO: Also extract referenced columns (e.g. non-SELECT lineage) + # TODO: Also extract referenced columns (aka auxillary / non-SELECT lineage) except (sqlglot.errors.OptimizeError, ValueError) as e: raise SqlUnderstandingError( f"sqlglot failed to compute some lineage: {e}" @@ -700,6 +745,12 @@ def _extract_select_from_create( return statement +def _is_create_table_ddl(statement: sqlglot.exp.Expression) -> bool: + return isinstance(statement, sqlglot.exp.Create) and isinstance( + statement.this, sqlglot.exp.Schema + ) + + def _try_extract_select( statement: sqlglot.exp.Expression, ) -> sqlglot.exp.Expression: @@ -766,6 +817,7 @@ def _translate_sqlglot_type( def _translate_internal_column_lineage( table_name_urn_mapping: Dict[_TableName, str], raw_column_lineage: _ColumnLineageInfo, + dialect: str, ) -> ColumnLineageInfo: downstream_urn = None if raw_column_lineage.downstream.table: @@ -779,7 +831,9 @@ def _translate_internal_column_lineage( ) if raw_column_lineage.downstream.column_type else None, - native_column_type=raw_column_lineage.downstream.column_type.sql() + native_column_type=raw_column_lineage.downstream.column_type.sql( + dialect=dialect + ) if raw_column_lineage.downstream.column_type and raw_column_lineage.downstream.column_type.this != sqlglot.exp.DataType.Type.UNKNOWN @@ -800,12 +854,14 @@ def _get_dialect(platform: str) -> str: # TODO: convert datahub platform names to sqlglot dialect if platform == "presto-on-hive": return "hive" + if platform == "mssql": + return "tsql" else: return platform def _sqlglot_lineage_inner( - sql: str, + sql: sqlglot.exp.ExpOrStr, schema_resolver: SchemaResolver, default_db: Optional[str] = None, default_schema: Optional[str] = None, @@ -918,7 +974,7 @@ def _sqlglot_lineage_inner( if column_lineage: column_lineage_urns = [ _translate_internal_column_lineage( - table_name_urn_mapping, internal_col_lineage + table_name_urn_mapping, internal_col_lineage, dialect=dialect ) for internal_col_lineage in column_lineage ] diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_create_view_with_cte.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_create_view_with_cte.json index f0175b4dc88927..d610b0a83f2290 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_create_view_with_cte.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_create_view_with_cte.json @@ -18,7 +18,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -36,7 +36,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -54,7 +54,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -72,7 +72,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json index b7df5444987f23..2d3d188d28316d 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_from_sharded_table_wildcard.json @@ -14,7 +14,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -32,7 +32,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_nested_subqueries.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_nested_subqueries.json index 67e306bebf5459..41ae0885941b00 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_nested_subqueries.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_nested_subqueries.json @@ -14,7 +14,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -32,7 +32,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json index b7df5444987f23..2d3d188d28316d 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_sharded_table_normalization.json @@ -14,7 +14,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -32,7 +32,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json index b393b2445d6c4a..26f8f8f59a3ff6 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_star_with_replace.json @@ -16,7 +16,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -34,7 +34,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -52,7 +52,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_view_from_union.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_view_from_union.json index 53fb94300e8042..83365c09f69c20 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_view_from_union.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_view_from_union.json @@ -17,7 +17,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { @@ -39,7 +39,7 @@ "com.linkedin.pegasus2avro.schema.StringType": {} } }, - "native_column_type": "TEXT" + "native_column_type": "STRING" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_table_ddl.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_table_ddl.json index 4773974545bfab..cf31b71cb50f6b 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_table_ddl.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_table_ddl.json @@ -4,5 +4,58 @@ "out_tables": [ "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)" ], - "column_lineage": null + "column_lineage": [ + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)", + "column": "id", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "INTEGER" + }, + "upstreams": [] + }, + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)", + "column": "month", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "TEXT" + }, + "upstreams": [] + }, + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)", + "column": "total_cost", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "REAL" + }, + "upstreams": [] + }, + { + "downstream": { + "table": "urn:li:dataset:(urn:li:dataPlatform:sqlite,costs,PROD)", + "column": "area", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "native_column_type": "REAL" + }, + "upstreams": [] + } + ] } \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json index ff452467aa5bdd..8a6b60d0f1bde8 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_create_view_as_select.json @@ -30,7 +30,7 @@ "com.linkedin.pegasus2avro.schema.NumberType": {} } }, - "native_column_type": "BIGINT" + "native_column_type": "NUMBER" }, "upstreams": [] }, diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_struct_subfields.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_struct_subfields.json index 5ad847e252497a..2424fcda347524 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_struct_subfields.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_from_struct_subfields.json @@ -14,7 +14,7 @@ "com.linkedin.pegasus2avro.schema.NumberType": {} } }, - "native_column_type": "DECIMAL" + "native_column_type": "NUMERIC" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_full_col_name.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_full_col_name.json index 6ee3d2e61c39b7..8dd2633eff6128 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_full_col_name.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_select_with_full_col_name.json @@ -14,7 +14,7 @@ "com.linkedin.pegasus2avro.schema.NumberType": {} } }, - "native_column_type": "DECIMAL" + "native_column_type": "NUMERIC" }, "upstreams": [ { diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_default_normalization.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_default_normalization.json index b0351a7e07ad2d..ee80285d87f60b 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_default_normalization.json +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_teradata_default_normalization.json @@ -12,6 +12,7 @@ "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)", "column": "PatientId", + "column_type": null, "native_column_type": "INTEGER()" }, "upstreams": [ @@ -25,6 +26,7 @@ "downstream": { "table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)", "column": "BMI", + "column_type": null, "native_column_type": "FLOAT()" }, "upstreams": [