diff --git a/metaphor/azure_data_factory/extractor.py b/metaphor/azure_data_factory/extractor.py index 96d6f2ef..b5417f63 100644 --- a/metaphor/azure_data_factory/extractor.py +++ b/metaphor/azure_data_factory/extractor.py @@ -29,6 +29,7 @@ DatasetLogicalID, DatasetUpstream, DependencyCondition, + EntityUpstream, Pipeline, PipelineInfo, PipelineLogicalID, @@ -78,8 +79,16 @@ async def extract(self) -> Collection[ENTITY_TYPES]: for factory in self._get_factories(df_client): self.extract_for_factory(factory, df_client) - # Remove duplicate source_dataset, and set None for empty upstream data + # Remove duplicate source_entity and source_dataset, and set None for empty upstream data for dataset in self._datasets.values(): + unique_source_entities = unique_list( + dataset.entity_upstream.source_entities + ) + if len(unique_source_entities) == 0: + dataset.entity_upstream = None + else: + dataset.entity_upstream.source_entities = unique_source_entities + unique_source_dataset = unique_list(dataset.upstream.source_datasets) if len(unique_source_dataset) == 0: dataset.upstream = None @@ -188,6 +197,7 @@ def _extract_table_lineage( source_entity_id = str( to_dataset_entity_id_from_logical_id(source.logical_id) ) + sink.entity_upstream.source_entities.append(source_entity_id) sink.upstream.source_datasets.append(source_entity_id) def _get_data_flows( @@ -266,6 +276,7 @@ def _get_datasets( name=dataset_normalized_name(database, schema, table), platform=DataPlatform.SNOWFLAKE, ), + entity_upstream=EntityUpstream(source_entities=[]), upstream=DatasetUpstream(source_datasets=[]), ) @@ -285,6 +296,7 @@ def _get_datasets( ), platform=DataPlatform.MSSQL, ), + entity_upstream=EntityUpstream(source_entities=[]), upstream=DatasetUpstream(source_datasets=[]), ) @@ -315,6 +327,7 @@ def _get_datasets( name=full_path, platform=DataPlatform.AZURE_BLOB_STORAGE, ), + entity_upstream=EntityUpstream(source_entities=[]), upstream=DatasetUpstream(source_datasets=[]), ) @@ -536,6 +549,7 @@ def _process_activities( # Copy activity table lineage for source_id in copy_from: + dataset.entity_upstream.source_entities.append(source_id) dataset.upstream.source_datasets.append(source_id) # Pipeline info for each copy activity output entity diff --git a/metaphor/bigquery/lineage/extractor.py b/metaphor/bigquery/lineage/extractor.py index 46a9644e..7506c2f9 100644 --- a/metaphor/bigquery/lineage/extractor.py +++ b/metaphor/bigquery/lineage/extractor.py @@ -29,6 +29,7 @@ Dataset, DatasetLogicalID, DatasetUpstream, + EntityUpstream, ) logger = get_logger() @@ -142,6 +143,9 @@ def _parse_view_lineage(self, project_id, bq_table: bigquery.table.Table) -> Non if dataset_ids: dataset = self._init_dataset(view_name) + dataset.entity_upstream = EntityUpstream( + source_entities=list(dataset_ids), transformation=view_query + ) dataset.upstream = DatasetUpstream( source_datasets=list(dataset_ids), transformation=view_query ) @@ -191,11 +195,16 @@ def _parse_job_change_entry(self, entry: LogEntry): table_name = destination.table_name() dataset = self._init_dataset(table_name) + source_entities = [ + str(to_dataset_entity_id(source.table_name(), DataPlatform.BIGQUERY)) + for source in job_change.source_tables + ] + dataset.entity_upstream = EntityUpstream( + source_entities=source_entities, + transformation=job_change.query, + ) dataset.upstream = DatasetUpstream( - source_datasets=[ - str(to_dataset_entity_id(source.table_name(), DataPlatform.BIGQUERY)) - for source in job_change.source_tables - ], + source_datasets=source_entities, transformation=job_change.query, ) diff --git a/metaphor/custom/lineage/extractor.py b/metaphor/custom/lineage/extractor.py index c2a124c9..d1a4aa6c 100644 --- a/metaphor/custom/lineage/extractor.py +++ b/metaphor/custom/lineage/extractor.py @@ -9,6 +9,7 @@ Dataset, DatasetUpstream, EntityType, + EntityUpstream, MetadataChangeEvent, ) @@ -41,12 +42,12 @@ async def extract(self) -> List[MetadataChangeEvent]: logical_id=id.to_logical_id() ) + unique_datasets = unique_list(source_datasets) datasets.append( Dataset( logical_id=lineage.dataset.to_logical_id(), - upstream=DatasetUpstream( - source_datasets=unique_list(source_datasets) - ), + entity_upstream=EntityUpstream(source_entities=unique_datasets), + upstream=DatasetUpstream(source_datasets=unique_datasets), ) ) diff --git a/metaphor/fivetran/extractor.py b/metaphor/fivetran/extractor.py index cd41df57..8c6ca5c4 100644 --- a/metaphor/fivetran/extractor.py +++ b/metaphor/fivetran/extractor.py @@ -35,6 +35,7 @@ Dataset, DatasetLogicalID, DatasetUpstream, + EntityUpstream, FieldMapping, FiveTranConnectorStatus, FivetranPipeline, @@ -281,6 +282,10 @@ def _init_dataset( platform=destination_platform, account=self.get_snowflake_account_from_config(destination.config), ), + entity_upstream=EntityUpstream( + source_entities=[], + field_mappings=[], + ), upstream=DatasetUpstream( source_datasets=[], field_mappings=[], @@ -309,6 +314,7 @@ def _init_dataset( pipeline_mapping.is_virtual = False pipeline_mapping.source_entity_id = source_entity_id + dataset.entity_upstream.source_entities = [source_entity_id] dataset.upstream.source_datasets = [source_entity_id] pipeline.fivetran.sources = list( @@ -326,6 +332,7 @@ def _init_dataset( field=column.name_in_source, ) ) + dataset.entity_upstream.field_mappings.append(field_mapping) dataset.upstream.field_mappings.append(field_mapping) dataset.pipeline_info = PipelineInfo(pipeline_mapping=[pipeline_mapping]) diff --git a/metaphor/redshift/lineage/extractor.py b/metaphor/redshift/lineage/extractor.py index f7cb70b1..16470584 100644 --- a/metaphor/redshift/lineage/extractor.py +++ b/metaphor/redshift/lineage/extractor.py @@ -15,6 +15,7 @@ Dataset, DatasetLogicalID, DatasetUpstream, + EntityUpstream, ) from metaphor.postgresql.extractor import PostgreSQLExtractor from metaphor.redshift.lineage.config import RedshiftLineageRunConfig @@ -190,8 +191,9 @@ def format_table_name(table: Table, database: str) -> str: for source in sources ] - self._init_dataset_with_upstream( + self._init_dataset_with_entity_upstream( target, + EntityUpstream(source_entities=source_ids, transformation=query), DatasetUpstream(source_datasets=source_ids, transformation=query), ) @@ -222,21 +224,31 @@ async def _fetch_lineage(self, sql, conn, db) -> None: for target_table_name in upstream_map.keys(): sources, query = upstream_map[target_table_name] - self._init_dataset_with_upstream( + unique_sources = unique_list(sources) + self._init_dataset_with_entity_upstream( target_table_name, + EntityUpstream( + source_entities=unique_sources, + transformation=query, + ), DatasetUpstream( - source_datasets=unique_list(sources), transformation=query + source_datasets=unique_sources, + transformation=query, ), ) - def _init_dataset_with_upstream( - self, table_name: str, upstream: DatasetUpstream + def _init_dataset_with_entity_upstream( + self, + table_name: str, + entity_upstream: EntityUpstream, + upstream: DatasetUpstream, ) -> Dataset: if table_name not in self._datasets: self._datasets[table_name] = Dataset( logical_id=DatasetLogicalID( name=table_name, platform=DataPlatform.REDSHIFT ), + entity_upstream=entity_upstream, upstream=upstream, ) return self._datasets[table_name] diff --git a/metaphor/snowflake/lineage/extractor.py b/metaphor/snowflake/lineage/extractor.py index ab776577..bf16ddb7 100644 --- a/metaphor/snowflake/lineage/extractor.py +++ b/metaphor/snowflake/lineage/extractor.py @@ -20,6 +20,7 @@ Dataset, DatasetLogicalID, DatasetUpstream, + EntityUpstream, ) from metaphor.snowflake import auth from metaphor.snowflake.accessed_object import AccessedObject @@ -215,12 +216,17 @@ def _parse_access_log( # Nothing to remove if there's no self lineage pass + entity_upstream = EntityUpstream( + source_entities=filtered_source_datasets, transformation=query + ) upstream = DatasetUpstream( source_datasets=filtered_source_datasets, transformation=query ) self._datasets[normalized_name] = Dataset( - logical_id=logical_id, upstream=upstream + logical_id=logical_id, + entity_upstream=entity_upstream, + upstream=upstream, ) def _parse_object_dependencies( @@ -268,13 +274,18 @@ def _parse_object_dependencies( ) if target_normalized_name in self._datasets: - source_datasets = self._datasets[ - target_normalized_name - ].upstream.source_datasets + dataset = self._datasets[target_normalized_name] + source_entities = dataset.entity_upstream.source_entities + if source_entity_id_str not in source_entities: + source_entities.append(source_entity_id_str) + source_datasets = dataset.upstream.source_datasets if source_entity_id_str not in source_datasets: source_datasets.append(source_entity_id_str) else: self._datasets[target_normalized_name] = Dataset( logical_id=target_logical_id, + entity_upstream=EntityUpstream( + source_entities=[source_entity_id_str] + ), upstream=DatasetUpstream(source_datasets=[source_entity_id_str]), ) diff --git a/metaphor/unity_catalog/extractor.py b/metaphor/unity_catalog/extractor.py index f17c4b7b..657ad465 100644 --- a/metaphor/unity_catalog/extractor.py +++ b/metaphor/unity_catalog/extractor.py @@ -25,6 +25,7 @@ DatasetSchema, DatasetStructure, DatasetUpstream, + EntityUpstream, FieldMapping, KeyValuePair, MaterializationType, @@ -275,8 +276,12 @@ def _populate_lineage(self, dataset: Dataset): f"Unable to extract lineage for {table_name} due to permission issues" ) + unique_datasets = unique_list(source_datasets) + dataset.entity_upstream = EntityUpstream( + source_entities=unique_datasets, field_mappings=field_mappings + ) dataset.upstream = DatasetUpstream( - source_datasets=unique_list(source_datasets), field_mappings=field_mappings + source_datasets=unique_datasets, field_mappings=field_mappings ) def _get_query_logs(self) -> QueryLogs: diff --git a/pyproject.toml b/pyproject.toml index c081fb39..bc9b07f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.13.43" +version = "0.13.44" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/azure_data_factory/expected.json b/tests/azure_data_factory/expected.json index 8a896db3..da3ab313 100644 --- a/tests/azure_data_factory/expected.json +++ b/tests/azure_data_factory/expected.json @@ -12,6 +12,11 @@ "name": "database.schema.table2", "platform": "SNOWFLAKE" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~D0BB6D0EA7E559C7966357BE929E840B" + ] + }, "upstream": { "sourceDatasets": [ "DATASET~D0BB6D0EA7E559C7966357BE929E840B" @@ -39,6 +44,11 @@ "name": "https://storage-account.blob.core.windows.net/test/foo/bar.json", "platform": "AZURE_BLOB_STORAGE" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~3D47E66557297571AD3225FF197AE151" + ] + }, "upstream": { "sourceDatasets": [ "DATASET~3D47E66557297571AD3225FF197AE151" diff --git a/tests/bigquery/lineage/data/result.json b/tests/bigquery/lineage/data/result.json index 633c6324..cb2eef63 100644 --- a/tests/bigquery/lineage/data/result.json +++ b/tests/bigquery/lineage/data/result.json @@ -4,6 +4,11 @@ "name": "metaphor-data.test.yi_tests2", "platform": "BIGQUERY" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~930E4BD28074A60959C98F61289311E0" + ] + }, "upstream": { "sourceDatasets": [ "DATASET~930E4BD28074A60959C98F61289311E0" @@ -15,6 +20,15 @@ "name": "metaphor-data.test.yi_tests", "platform": "BIGQUERY" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~930E4BD28074A60959C98F61289311E0", + "DATASET~E16CCC1A55EE3E61C1CE8B0C47F67998", + "DATASET~55CAA090B9E8317AFC195CBD843D6665", + "DATASET~6BC1F571B76A5BBE6B1F6C0E7DCA533C" + ], + "transformation": "INSERT INTO `metaphor-data.test.yi_test3` \nSELECT * from `metaphor-data.test.yi_tests1` \nUNION ALL \nSELECT * from `metaphor-data.test.yi_tests2`" + }, "upstream": { "sourceDatasets": [ "DATASET~930E4BD28074A60959C98F61289311E0", diff --git a/tests/bigquery/lineage/data/view_result.json b/tests/bigquery/lineage/data/view_result.json index a13bec9c..e44baca6 100644 --- a/tests/bigquery/lineage/data/view_result.json +++ b/tests/bigquery/lineage/data/view_result.json @@ -4,6 +4,12 @@ "name": "project1.dataset1.table1", "platform": "BIGQUERY" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~A6BA6F986B360A57CF65200F29F5B251" + ], + "transformation": "select * from `foo`" + }, "upstream": { "sourceDatasets": [ "DATASET~A6BA6F986B360A57CF65200F29F5B251" @@ -16,6 +22,12 @@ "name": "project1.dataset1.table2", "platform": "BIGQUERY" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~A6BA6F986B360A57CF65200F29F5B251" + ], + "transformation": "select * from `Foo`" + }, "upstream": { "sourceDatasets": [ "DATASET~A6BA6F986B360A57CF65200F29F5B251" @@ -28,6 +40,12 @@ "name": "project1.dataset1.table3", "platform": "BIGQUERY" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~A6BA6F986B360A57CF65200F29F5B251" + ], + "transformation": "select * from foo" + }, "upstream": { "sourceDatasets": [ "DATASET~A6BA6F986B360A57CF65200F29F5B251" diff --git a/tests/custom/lineage/expected.json b/tests/custom/lineage/expected.json index 94bd74c9..4c0c1846 100644 --- a/tests/custom/lineage/expected.json +++ b/tests/custom/lineage/expected.json @@ -4,6 +4,12 @@ "name": "foo", "platform": "BIGQUERY" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~856FE6D6B85A5758921110D2A1FC5E60", + "DATASET~2C268725B528CB0203E0F1AA22B95D65" + ] + }, "upstream": { "sourceDatasets": [ "DATASET~856FE6D6B85A5758921110D2A1FC5E60", diff --git a/tests/fivetran/expected.json b/tests/fivetran/expected.json index 6c0ef337..6fb84ef5 100644 --- a/tests/fivetran/expected.json +++ b/tests/fivetran/expected.json @@ -5,6 +5,10 @@ "name": "fivetran.google_drive.dataset_foo_sheet_1", "platform": "SNOWFLAKE" }, + "entityUpstream": { + "fieldMappings": [], + "sourceEntities": [] + }, "upstream": { "fieldMappings": [], "sourceDatasets": [] @@ -24,6 +28,41 @@ "name": "fivetran.snowflake_db_schema.table", "platform": "SNOWFLAKE" }, + "entityUpstream": { + "fieldMappings": [ + { + "destination": "col1", + "sources": [ + { + "dataset": { + "account": "source_account", + "name": "source_db.schema.table", + "platform": "SNOWFLAKE" + }, + "field": "col1", + "sourceEntityId": "DATASET~E71466A1A1CE8D63F92424B3CF3F3F4E" + } + ] + }, + { + "destination": "col2", + "sources": [ + { + "dataset": { + "account": "source_account", + "name": "source_db.schema.table", + "platform": "SNOWFLAKE" + }, + "field": "col2", + "sourceEntityId": "DATASET~E71466A1A1CE8D63F92424B3CF3F3F4E" + } + ] + } + ], + "sourceEntities": [ + "DATASET~E71466A1A1CE8D63F92424B3CF3F3F4E" + ] + }, "upstream": { "fieldMappings": [ { @@ -75,6 +114,27 @@ "name": "fivetran.snowflake_db_schema.table2", "platform": "SNOWFLAKE" }, + "entityUpstream": { + "fieldMappings": [ + { + "destination": "col1", + "sources": [ + { + "dataset": { + "account": "source_account", + "name": "source_db.schema.table2", + "platform": "SNOWFLAKE" + }, + "field": "col1", + "sourceEntityId": "DATASET~F31B21D3EDB9FF855528E9D6679840C7" + } + ] + } + ], + "sourceEntities": [ + "DATASET~F31B21D3EDB9FF855528E9D6679840C7" + ] + }, "upstream": { "fieldMappings": [ { @@ -112,6 +172,41 @@ "name": "fivetran.sqldb_foo.table", "platform": "SNOWFLAKE" }, + "entityUpstream": { + "fieldMappings": [ + { + "destination": "col1", + "sources": [ + { + "dataset": { + "account": "sql-server.foo.bar", + "name": "source_db.schema.table", + "platform": "MSSQL" + }, + "field": "col1", + "sourceEntityId": "DATASET~CA36B7F1E380295A5286B5E231A6A088" + } + ] + }, + { + "destination": "col2", + "sources": [ + { + "dataset": { + "account": "sql-server.foo.bar", + "name": "source_db.schema.table", + "platform": "MSSQL" + }, + "field": "col2", + "sourceEntityId": "DATASET~CA36B7F1E380295A5286B5E231A6A088" + } + ] + } + ], + "sourceEntities": [ + "DATASET~CA36B7F1E380295A5286B5E231A6A088" + ] + }, "upstream": { "fieldMappings": [ { @@ -163,7 +258,7 @@ "name": "fivetran.sqldb_foo.table2", "platform": "SNOWFLAKE" }, - "upstream": { + "entityUpstream": { "fieldMappings": [ { "destination": "col1", @@ -180,7 +275,7 @@ ] } ], - "sourceDatasets": [ + "sourceEntities": [ "DATASET~640B6534F4B207F3A50447769DB47B38" ] }, @@ -192,6 +287,27 @@ "sourceEntityId": "DATASET~640B6534F4B207F3A50447769DB47B38" } ] + }, + "upstream": { + "fieldMappings": [ + { + "destination": "col1", + "sources": [ + { + "dataset": { + "account": "sql-server.foo.bar", + "name": "source_db.schema.table2", + "platform": "MSSQL" + }, + "field": "col1", + "sourceEntityId": "DATASET~640B6534F4B207F3A50447769DB47B38" + } + ] + } + ], + "sourceDatasets": [ + "DATASET~640B6534F4B207F3A50447769DB47B38" + ] } }, { diff --git a/tests/redshift/lineage/data/result_exclude_self_lineage.json b/tests/redshift/lineage/data/result_exclude_self_lineage.json index b66e0505..420e1e98 100644 --- a/tests/redshift/lineage/data/result_exclude_self_lineage.json +++ b/tests/redshift/lineage/data/result_exclude_self_lineage.json @@ -4,6 +4,12 @@ "name": "test.product.foo", "platform": "REDSHIFT" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~80035CA83670B9F71F2329A249B4473E" + ], + "transformation": "INSERT INTO product.foo pf SELECT * FROM stock.foo sf WHERE sf.price > 0" + }, "upstream": { "sourceDatasets": [ "DATASET~80035CA83670B9F71F2329A249B4473E" @@ -16,6 +22,13 @@ "name": "test.product.bar", "platform": "REDSHIFT" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~7430F4962E505E847BC1F0EEDE30CDE8", + "DATASET~C6D251ED970F700029534E6ACFE4E356" + ], + "transformation": "CREATE TABLE product.bar AS SELECT * FROM stock.bar sbr JOIN stock.baz sbz ON sbr.id == sbz.id" + }, "upstream": { "sourceDatasets": [ "DATASET~7430F4962E505E847BC1F0EEDE30CDE8", diff --git a/tests/redshift/lineage/data/result_include_self_lineage.json b/tests/redshift/lineage/data/result_include_self_lineage.json index 840e31e9..d4414a43 100644 --- a/tests/redshift/lineage/data/result_include_self_lineage.json +++ b/tests/redshift/lineage/data/result_include_self_lineage.json @@ -1,20 +1,29 @@ [ { - "logicalId": { - "name": "test.product.foo", - "platform": "REDSHIFT" + "entityUpstream": { + "sourceEntities": [ + "DATASET~80035CA83670B9F71F2329A249B4473E" + ], + "transformation": "INSERT INTO product.foo pf SELECT * FROM stock.foo sf WHERE sf.price > 0" }, "upstream": { "sourceDatasets": [ "DATASET~80035CA83670B9F71F2329A249B4473E" ], "transformation": "INSERT INTO product.foo pf SELECT * FROM stock.foo sf WHERE sf.price > 0" + }, + "logicalId": { + "name": "test.product.foo", + "platform": "REDSHIFT" } }, { - "logicalId": { - "name": "test.product.bar", - "platform": "REDSHIFT" + "entityUpstream": { + "sourceEntities": [ + "DATASET~7430F4962E505E847BC1F0EEDE30CDE8", + "DATASET~C6D251ED970F700029534E6ACFE4E356" + ], + "transformation": "CREATE TABLE product.bar AS SELECT * FROM stock.bar sbr JOIN stock.baz sbz ON sbr.id == sbz.id" }, "upstream": { "sourceDatasets": [ @@ -22,18 +31,28 @@ "DATASET~C6D251ED970F700029534E6ACFE4E356" ], "transformation": "CREATE TABLE product.bar AS SELECT * FROM stock.bar sbr JOIN stock.baz sbz ON sbr.id == sbz.id" + }, + "logicalId": { + "name": "test.product.bar", + "platform": "REDSHIFT" } }, { - "logicalId": { - "name": "test.public.self", - "platform": "REDSHIFT" + "entityUpstream": { + "sourceEntities": [ + "DATASET~01CDC7A4EBF2F45A3D17C3996CA4FBD2" + ], + "transformation": "INSERT INTO public.self t SELECT * FROM public.self s WHERE sf.price > 0" }, "upstream": { "sourceDatasets": [ "DATASET~01CDC7A4EBF2F45A3D17C3996CA4FBD2" ], "transformation": "INSERT INTO public.self t SELECT * FROM public.self s WHERE sf.price > 0" + }, + "logicalId": { + "name": "test.public.self", + "platform": "REDSHIFT" } } ] diff --git a/tests/redshift/lineage/data/result_view.json b/tests/redshift/lineage/data/result_view.json index f041deef..c98b300e 100644 --- a/tests/redshift/lineage/data/result_view.json +++ b/tests/redshift/lineage/data/result_view.json @@ -4,6 +4,12 @@ "name": "test.private.t1", "platform": "REDSHIFT" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~CFCE7386EFABDA3E3ED4DCD5B69083FC", + "DATASET~B077275BCBF5E69AD0DBFCAE3F7E47E4" + ] + }, "upstream": { "sourceDatasets": [ "DATASET~CFCE7386EFABDA3E3ED4DCD5B69083FC", @@ -16,6 +22,11 @@ "name": "test.foo.t2", "platform": "REDSHIFT" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~CFCE7386EFABDA3E3ED4DCD5B69083FC" + ] + }, "upstream": { "sourceDatasets": [ "DATASET~CFCE7386EFABDA3E3ED4DCD5B69083FC" diff --git a/tests/snowflake/lineage/data/parse_object_dependencies_result.json b/tests/snowflake/lineage/data/parse_object_dependencies_result.json index 412a4a1d..bd2a0065 100644 --- a/tests/snowflake/lineage/data/parse_object_dependencies_result.json +++ b/tests/snowflake/lineage/data/parse_object_dependencies_result.json @@ -5,6 +5,11 @@ "name": "acme.metaphor.bar", "platform": "SNOWFLAKE" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~EAEA28F30430853E9C299356A7C4CB46" + ] + }, "upstream": { "sourceDatasets": [ "DATASET~EAEA28F30430853E9C299356A7C4CB46" @@ -17,6 +22,11 @@ "name": "acme.metaphor.xyz", "platform": "SNOWFLAKE" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~219FE88EBECD32D96D20E516E08E4DA0" + ] + }, "upstream": { "sourceDatasets": [ "DATASET~219FE88EBECD32D96D20E516E08E4DA0" diff --git a/tests/snowflake/lineage/data/parse_query_log_result_exclude_self_lineage.json b/tests/snowflake/lineage/data/parse_query_log_result_exclude_self_lineage.json index a9011596..ebd2584a 100644 --- a/tests/snowflake/lineage/data/parse_query_log_result_exclude_self_lineage.json +++ b/tests/snowflake/lineage/data/parse_query_log_result_exclude_self_lineage.json @@ -5,6 +5,13 @@ "name": "db2.schema1.table1", "platform": "SNOWFLAKE" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~E6F80D26D79C59FEF0BA6B6DA45BC08F", + "DATASET~C9DAA548172E27E915ED3A2AB81A56C4" + ], + "transformation": "query" + }, "upstream": { "sourceDatasets": [ "DATASET~E6F80D26D79C59FEF0BA6B6DA45BC08F", @@ -19,6 +26,14 @@ "name": "db2.schema1.table2", "platform": "SNOWFLAKE" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~E6F80D26D79C59FEF0BA6B6DA45BC08F", + "DATASET~C9DAA548172E27E915ED3A2AB81A56C4", + "DATASET~81BBDE0AB867B318A1348E654DC7F426" + ], + "transformation": "query" + }, "upstream": { "sourceDatasets": [ "DATASET~E6F80D26D79C59FEF0BA6B6DA45BC08F", diff --git a/tests/snowflake/lineage/data/parse_query_log_result_include_self_lineage.json b/tests/snowflake/lineage/data/parse_query_log_result_include_self_lineage.json index 6664a884..e64ce8d3 100644 --- a/tests/snowflake/lineage/data/parse_query_log_result_include_self_lineage.json +++ b/tests/snowflake/lineage/data/parse_query_log_result_include_self_lineage.json @@ -5,6 +5,14 @@ "name": "db2.schema1.table1", "platform": "SNOWFLAKE" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~E6F80D26D79C59FEF0BA6B6DA45BC08F", + "DATASET~C9DAA548172E27E915ED3A2AB81A56C4", + "DATASET~81BBDE0AB867B318A1348E654DC7F426" + ], + "transformation": "query" + }, "upstream": { "sourceDatasets": [ "DATASET~E6F80D26D79C59FEF0BA6B6DA45BC08F", @@ -20,6 +28,14 @@ "name": "db2.schema1.table2", "platform": "SNOWFLAKE" }, + "entityUpstream": { + "sourceEntities": [ + "DATASET~E6F80D26D79C59FEF0BA6B6DA45BC08F", + "DATASET~C9DAA548172E27E915ED3A2AB81A56C4", + "DATASET~81BBDE0AB867B318A1348E654DC7F426" + ], + "transformation": "query" + }, "upstream": { "sourceDatasets": [ "DATASET~E6F80D26D79C59FEF0BA6B6DA45BC08F", diff --git a/tests/unity_catalog/expected.json b/tests/unity_catalog/expected.json index aaa38866..7c750a0d 100644 --- a/tests/unity_catalog/expected.json +++ b/tests/unity_catalog/expected.json @@ -37,6 +37,27 @@ "schema": "schema", "table": "table" }, + "entityUpstream": { + "fieldMappings": [ + { + "destination": "col1", + "sources": [ + { + "dataset": { + "name": "db.schema.upstream", + "platform": "UNITY_CATALOG" + }, + "field": "col1", + "sourceEntityId": "DATASET~4B3CF34E5B62D97FAF33F75C7B32BB84" + } + ] + } + ], + "sourceEntities": [ + "DATASET~4B3CF34E5B62D97FAF33F75C7B32BB84", + "DATASET~97D032124F4B526411F0D04797CEAC96" + ] + }, "upstream": { "fieldMappings": [ {