Skip to content

Commit

Permalink
Migrate all crawlers to emit entityUpstream (#680)
Browse files Browse the repository at this point in the history
* Migrate all crawlers to emit entityUpstream

* add pack DatasetUpstream

* bump version
  • Loading branch information
usefulalgorithm authored Nov 13, 2023
1 parent 52ba1fa commit 6a461ae
Show file tree
Hide file tree
Showing 20 changed files with 358 additions and 30 deletions.
16 changes: 15 additions & 1 deletion metaphor/azure_data_factory/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
DatasetLogicalID,
DatasetUpstream,
DependencyCondition,
EntityUpstream,
Pipeline,
PipelineInfo,
PipelineLogicalID,
Expand Down Expand Up @@ -78,8 +79,16 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
for factory in self._get_factories(df_client):
self.extract_for_factory(factory, df_client)

# Remove duplicate source_dataset, and set None for empty upstream data
# Remove duplicate source_entity and source_dataset, and set None for empty upstream data
for dataset in self._datasets.values():
unique_source_entities = unique_list(
dataset.entity_upstream.source_entities
)
if len(unique_source_entities) == 0:
dataset.entity_upstream = None
else:
dataset.entity_upstream.source_entities = unique_source_entities

unique_source_dataset = unique_list(dataset.upstream.source_datasets)
if len(unique_source_dataset) == 0:
dataset.upstream = None
Expand Down Expand Up @@ -188,6 +197,7 @@ def _extract_table_lineage(
source_entity_id = str(
to_dataset_entity_id_from_logical_id(source.logical_id)
)
sink.entity_upstream.source_entities.append(source_entity_id)
sink.upstream.source_datasets.append(source_entity_id)

def _get_data_flows(
Expand Down Expand Up @@ -266,6 +276,7 @@ def _get_datasets(
name=dataset_normalized_name(database, schema, table),
platform=DataPlatform.SNOWFLAKE,
),
entity_upstream=EntityUpstream(source_entities=[]),
upstream=DatasetUpstream(source_datasets=[]),
)

Expand All @@ -285,6 +296,7 @@ def _get_datasets(
),
platform=DataPlatform.MSSQL,
),
entity_upstream=EntityUpstream(source_entities=[]),
upstream=DatasetUpstream(source_datasets=[]),
)

Expand Down Expand Up @@ -315,6 +327,7 @@ def _get_datasets(
name=full_path,
platform=DataPlatform.AZURE_BLOB_STORAGE,
),
entity_upstream=EntityUpstream(source_entities=[]),
upstream=DatasetUpstream(source_datasets=[]),
)

Expand Down Expand Up @@ -536,6 +549,7 @@ def _process_activities(

# Copy activity table lineage
for source_id in copy_from:
dataset.entity_upstream.source_entities.append(source_id)
dataset.upstream.source_datasets.append(source_id)

# Pipeline info for each copy activity output entity
Expand Down
17 changes: 13 additions & 4 deletions metaphor/bigquery/lineage/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Dataset,
DatasetLogicalID,
DatasetUpstream,
EntityUpstream,
)

logger = get_logger()
Expand Down Expand Up @@ -142,6 +143,9 @@ def _parse_view_lineage(self, project_id, bq_table: bigquery.table.Table) -> Non

if dataset_ids:
dataset = self._init_dataset(view_name)
dataset.entity_upstream = EntityUpstream(
source_entities=list(dataset_ids), transformation=view_query
)
dataset.upstream = DatasetUpstream(
source_datasets=list(dataset_ids), transformation=view_query
)
Expand Down Expand Up @@ -191,11 +195,16 @@ def _parse_job_change_entry(self, entry: LogEntry):

table_name = destination.table_name()
dataset = self._init_dataset(table_name)
source_entities = [
str(to_dataset_entity_id(source.table_name(), DataPlatform.BIGQUERY))
for source in job_change.source_tables
]
dataset.entity_upstream = EntityUpstream(
source_entities=source_entities,
transformation=job_change.query,
)
dataset.upstream = DatasetUpstream(
source_datasets=[
str(to_dataset_entity_id(source.table_name(), DataPlatform.BIGQUERY))
for source in job_change.source_tables
],
source_datasets=source_entities,
transformation=job_change.query,
)

Expand Down
7 changes: 4 additions & 3 deletions metaphor/custom/lineage/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
Dataset,
DatasetUpstream,
EntityType,
EntityUpstream,
MetadataChangeEvent,
)

Expand Down Expand Up @@ -41,12 +42,12 @@ async def extract(self) -> List[MetadataChangeEvent]:
logical_id=id.to_logical_id()
)

unique_datasets = unique_list(source_datasets)
datasets.append(
Dataset(
logical_id=lineage.dataset.to_logical_id(),
upstream=DatasetUpstream(
source_datasets=unique_list(source_datasets)
),
entity_upstream=EntityUpstream(source_entities=unique_datasets),
upstream=DatasetUpstream(source_datasets=unique_datasets),
)
)

Expand Down
7 changes: 7 additions & 0 deletions metaphor/fivetran/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
Dataset,
DatasetLogicalID,
DatasetUpstream,
EntityUpstream,
FieldMapping,
FiveTranConnectorStatus,
FivetranPipeline,
Expand Down Expand Up @@ -281,6 +282,10 @@ def _init_dataset(
platform=destination_platform,
account=self.get_snowflake_account_from_config(destination.config),
),
entity_upstream=EntityUpstream(
source_entities=[],
field_mappings=[],
),
upstream=DatasetUpstream(
source_datasets=[],
field_mappings=[],
Expand Down Expand Up @@ -309,6 +314,7 @@ def _init_dataset(
pipeline_mapping.is_virtual = False
pipeline_mapping.source_entity_id = source_entity_id

dataset.entity_upstream.source_entities = [source_entity_id]
dataset.upstream.source_datasets = [source_entity_id]

pipeline.fivetran.sources = list(
Expand All @@ -326,6 +332,7 @@ def _init_dataset(
field=column.name_in_source,
)
)
dataset.entity_upstream.field_mappings.append(field_mapping)
dataset.upstream.field_mappings.append(field_mapping)

dataset.pipeline_info = PipelineInfo(pipeline_mapping=[pipeline_mapping])
Expand Down
22 changes: 17 additions & 5 deletions metaphor/redshift/lineage/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Dataset,
DatasetLogicalID,
DatasetUpstream,
EntityUpstream,
)
from metaphor.postgresql.extractor import PostgreSQLExtractor
from metaphor.redshift.lineage.config import RedshiftLineageRunConfig
Expand Down Expand Up @@ -190,8 +191,9 @@ def format_table_name(table: Table, database: str) -> str:
for source in sources
]

self._init_dataset_with_upstream(
self._init_dataset_with_entity_upstream(
target,
EntityUpstream(source_entities=source_ids, transformation=query),
DatasetUpstream(source_datasets=source_ids, transformation=query),
)

Expand Down Expand Up @@ -222,21 +224,31 @@ async def _fetch_lineage(self, sql, conn, db) -> None:

for target_table_name in upstream_map.keys():
sources, query = upstream_map[target_table_name]
self._init_dataset_with_upstream(
unique_sources = unique_list(sources)
self._init_dataset_with_entity_upstream(
target_table_name,
EntityUpstream(
source_entities=unique_sources,
transformation=query,
),
DatasetUpstream(
source_datasets=unique_list(sources), transformation=query
source_datasets=unique_sources,
transformation=query,
),
)

def _init_dataset_with_upstream(
self, table_name: str, upstream: DatasetUpstream
def _init_dataset_with_entity_upstream(
self,
table_name: str,
entity_upstream: EntityUpstream,
upstream: DatasetUpstream,
) -> Dataset:
if table_name not in self._datasets:
self._datasets[table_name] = Dataset(
logical_id=DatasetLogicalID(
name=table_name, platform=DataPlatform.REDSHIFT
),
entity_upstream=entity_upstream,
upstream=upstream,
)
return self._datasets[table_name]
19 changes: 15 additions & 4 deletions metaphor/snowflake/lineage/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Dataset,
DatasetLogicalID,
DatasetUpstream,
EntityUpstream,
)
from metaphor.snowflake import auth
from metaphor.snowflake.accessed_object import AccessedObject
Expand Down Expand Up @@ -215,12 +216,17 @@ def _parse_access_log(
# Nothing to remove if there's no self lineage
pass

entity_upstream = EntityUpstream(
source_entities=filtered_source_datasets, transformation=query
)
upstream = DatasetUpstream(
source_datasets=filtered_source_datasets, transformation=query
)

self._datasets[normalized_name] = Dataset(
logical_id=logical_id, upstream=upstream
logical_id=logical_id,
entity_upstream=entity_upstream,
upstream=upstream,
)

def _parse_object_dependencies(
Expand Down Expand Up @@ -268,13 +274,18 @@ def _parse_object_dependencies(
)

if target_normalized_name in self._datasets:
source_datasets = self._datasets[
target_normalized_name
].upstream.source_datasets
dataset = self._datasets[target_normalized_name]
source_entities = dataset.entity_upstream.source_entities
if source_entity_id_str not in source_entities:
source_entities.append(source_entity_id_str)
source_datasets = dataset.upstream.source_datasets
if source_entity_id_str not in source_datasets:
source_datasets.append(source_entity_id_str)
else:
self._datasets[target_normalized_name] = Dataset(
logical_id=target_logical_id,
entity_upstream=EntityUpstream(
source_entities=[source_entity_id_str]
),
upstream=DatasetUpstream(source_datasets=[source_entity_id_str]),
)
7 changes: 6 additions & 1 deletion metaphor/unity_catalog/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
DatasetSchema,
DatasetStructure,
DatasetUpstream,
EntityUpstream,
FieldMapping,
KeyValuePair,
MaterializationType,
Expand Down Expand Up @@ -275,8 +276,12 @@ def _populate_lineage(self, dataset: Dataset):
f"Unable to extract lineage for {table_name} due to permission issues"
)

unique_datasets = unique_list(source_datasets)
dataset.entity_upstream = EntityUpstream(
source_entities=unique_datasets, field_mappings=field_mappings
)
dataset.upstream = DatasetUpstream(
source_datasets=unique_list(source_datasets), field_mappings=field_mappings
source_datasets=unique_datasets, field_mappings=field_mappings
)

def _get_query_logs(self) -> QueryLogs:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.13.43"
version = "0.13.44"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
10 changes: 10 additions & 0 deletions tests/azure_data_factory/expected.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
"name": "database.schema.table2",
"platform": "SNOWFLAKE"
},
"entityUpstream": {
"sourceEntities": [
"DATASET~D0BB6D0EA7E559C7966357BE929E840B"
]
},
"upstream": {
"sourceDatasets": [
"DATASET~D0BB6D0EA7E559C7966357BE929E840B"
Expand Down Expand Up @@ -39,6 +44,11 @@
"name": "https://storage-account.blob.core.windows.net/test/foo/bar.json",
"platform": "AZURE_BLOB_STORAGE"
},
"entityUpstream": {
"sourceEntities": [
"DATASET~3D47E66557297571AD3225FF197AE151"
]
},
"upstream": {
"sourceDatasets": [
"DATASET~3D47E66557297571AD3225FF197AE151"
Expand Down
14 changes: 14 additions & 0 deletions tests/bigquery/lineage/data/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
"name": "metaphor-data.test.yi_tests2",
"platform": "BIGQUERY"
},
"entityUpstream": {
"sourceEntities": [
"DATASET~930E4BD28074A60959C98F61289311E0"
]
},
"upstream": {
"sourceDatasets": [
"DATASET~930E4BD28074A60959C98F61289311E0"
Expand All @@ -15,6 +20,15 @@
"name": "metaphor-data.test.yi_tests",
"platform": "BIGQUERY"
},
"entityUpstream": {
"sourceEntities": [
"DATASET~930E4BD28074A60959C98F61289311E0",
"DATASET~E16CCC1A55EE3E61C1CE8B0C47F67998",
"DATASET~55CAA090B9E8317AFC195CBD843D6665",
"DATASET~6BC1F571B76A5BBE6B1F6C0E7DCA533C"
],
"transformation": "INSERT INTO `metaphor-data.test.yi_test3` \nSELECT * from `metaphor-data.test.yi_tests1` \nUNION ALL \nSELECT * from `metaphor-data.test.yi_tests2`"
},
"upstream": {
"sourceDatasets": [
"DATASET~930E4BD28074A60959C98F61289311E0",
Expand Down
18 changes: 18 additions & 0 deletions tests/bigquery/lineage/data/view_result.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
"name": "project1.dataset1.table1",
"platform": "BIGQUERY"
},
"entityUpstream": {
"sourceEntities": [
"DATASET~A6BA6F986B360A57CF65200F29F5B251"
],
"transformation": "select * from `foo`"
},
"upstream": {
"sourceDatasets": [
"DATASET~A6BA6F986B360A57CF65200F29F5B251"
Expand All @@ -16,6 +22,12 @@
"name": "project1.dataset1.table2",
"platform": "BIGQUERY"
},
"entityUpstream": {
"sourceEntities": [
"DATASET~A6BA6F986B360A57CF65200F29F5B251"
],
"transformation": "select * from `Foo`"
},
"upstream": {
"sourceDatasets": [
"DATASET~A6BA6F986B360A57CF65200F29F5B251"
Expand All @@ -28,6 +40,12 @@
"name": "project1.dataset1.table3",
"platform": "BIGQUERY"
},
"entityUpstream": {
"sourceEntities": [
"DATASET~A6BA6F986B360A57CF65200F29F5B251"
],
"transformation": "select * from foo"
},
"upstream": {
"sourceDatasets": [
"DATASET~A6BA6F986B360A57CF65200F29F5B251"
Expand Down
Loading

0 comments on commit 6a461ae

Please sign in to comment.