diff --git a/metaphor/snowflake/extractor.py b/metaphor/snowflake/extractor.py index c4f73732..d4b19b0b 100644 --- a/metaphor/snowflake/extractor.py +++ b/metaphor/snowflake/extractor.py @@ -902,6 +902,8 @@ def _init_dataset( database=database, schema=schema, table=table ) + dataset.system_tags = SystemTags(tags=[]) + return dataset @staticmethod diff --git a/metaphor/snowflake/utils.py b/metaphor/snowflake/utils.py index 13e79af3..69dd8fc3 100644 --- a/metaphor/snowflake/utils.py +++ b/metaphor/snowflake/utils.py @@ -16,7 +16,6 @@ SnowflakeStreamSourceType, SnowflakeStreamType, SystemTag, - SystemTags, ) logger = logging.getLogger(__name__) @@ -240,13 +239,11 @@ def _update_field_system_tag(field: SchemaField, system_tag: SystemTag) -> None: def append_dataset_system_tag(dataset: Dataset, system_tag: SystemTag) -> None: - assert dataset.schema is not None - - if dataset.system_tags is None: - dataset.system_tags = SystemTags() - if dataset.system_tags.tags is None: - dataset.system_tags.tags = [] - + assert ( + dataset.schema is not None + and dataset.system_tags + and dataset.system_tags.tags is not None + ) # Always override exisiting tag, since we process database tags first, then schema tags and # then finally table tags other_tags = [t for t in dataset.system_tags.tags if t.key != system_tag.key] diff --git a/metaphor/tableau/extractor.py b/metaphor/tableau/extractor.py index 129bfeb8..b9afbb2e 100644 --- a/metaphor/tableau/extractor.py +++ b/metaphor/tableau/extractor.py @@ -16,7 +16,6 @@ from metaphor.common.entity_id import ( EntityId, to_dataset_entity_id, - to_dataset_entity_id_from_logical_id, to_virtual_view_entity_id, ) from metaphor.common.event_util import ENTITY_TYPES @@ -31,8 +30,6 @@ DashboardLogicalID, DashboardPlatform, DataPlatform, - Dataset, - DatasetLogicalID, EntityUpstream, SourceInfo, SystemContact, @@ -99,7 +96,6 @@ def __init__(self, config: TableauRunConfig): self._views: Dict[str, tableau.ViewItem] = {} self._projects: Dict[str, List[str]] = {} # project id -> project hierarchy - self._datasets: Dict[EntityId, Dataset] = {} self._virtual_views: Dict[str, VirtualView] = {} self._dashboards: Dict[str, Dashboard] = {} @@ -144,7 +140,6 @@ async def extract(self) -> Collection[ENTITY_TYPES]: return [ *self._dashboards.values(), *self._virtual_views.values(), - *self._datasets.values(), ] def _extract_dashboards( @@ -374,7 +369,6 @@ def _parse_custom_sql_table( logger.warning(f"Ignore non-fully qualified source table {fullname}") continue - self._init_dataset(fullname, platform, account, None) upstream_datasets.append( str(to_dataset_entity_id(fullname, platform, account)) ) @@ -432,9 +426,7 @@ def _parse_workbook_query_response( # if source_datasets is None or empty from custom SQL, use the upstreamTables of the datasource source_datasets = ( custom_sql_source.sources if custom_sql_source else None - ) or self._parse_upstream_datasets( - published_source.upstreamTables, system_tags - ) + ) or self._parse_upstream_datasets(published_source.upstreamTables) full_name, structure = self._build_asset_full_name_and_structure( published_source.name, @@ -498,9 +490,7 @@ def _parse_workbook_query_response( # if source_datasets is None or empty from custom SQL, use the upstreamTables of the datasource source_datasets = ( custom_sql_source.sources if custom_sql_source else None - ) or self._parse_upstream_datasets( - embedded_source.upstreamTables, system_tags - ) + ) or self._parse_upstream_datasets(embedded_source.upstreamTables) self._virtual_views[embedded_source.id] = VirtualView( logical_id=VirtualViewLogicalID( @@ -545,11 +535,8 @@ def _parse_workbook_query_response( def _parse_upstream_datasets( self, upstreamTables: List[DatabaseTable], - system_tags: Optional[SystemTags], ) -> List[str]: - upstream_datasets = [ - self._parse_dataset_id(table, system_tags) for table in upstreamTables - ] + upstream_datasets = [self._parse_dataset_id(table) for table in upstreamTables] return list( set( [ @@ -560,9 +547,7 @@ def _parse_upstream_datasets( ) ) - def _parse_dataset_id( - self, table: DatabaseTable, system_tags: Optional[SystemTags] - ) -> Optional[EntityId]: + def _parse_dataset_id(self, table: DatabaseTable) -> Optional[EntityId]: if ( not table.name or not table.schema_ @@ -613,7 +598,6 @@ def _parse_dataset_id( ) logger.debug(f"dataset id: {fullname} {connection_type} {account}") - self._init_dataset(fullname, platform, account, system_tags) return to_dataset_entity_id(fullname, platform, account) def _parse_chart(self, view: tableau.ViewItem) -> Chart: @@ -636,21 +620,6 @@ def _parse_chart(self, view: tableau.ViewItem) -> Chart: _workbook_url_regex = r".+\/workbooks\/(\d+)(\/.*)?" - def _init_dataset( - self, - normalized_name: str, - platform: DataPlatform, - account: Optional[str], - system_tags: Optional[SystemTags], - ) -> Dataset: - dataset = Dataset() - dataset.logical_id = DatasetLogicalID( - name=normalized_name, account=account, platform=platform - ) - dataset.system_tags = system_tags - entity_id = to_dataset_entity_id_from_logical_id(dataset.logical_id) - return self._datasets.setdefault(entity_id, dataset) - @staticmethod def _extract_workbook_id(workbook_url: str) -> str: """Extracts the workbook ID from a workbook URL""" diff --git a/pyproject.toml b/pyproject.toml index d6c69dc7..1d6c4289 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.100" +version = "0.14.101" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/snowflake/test_extractor.py b/tests/snowflake/test_extractor.py index bf6e1368..33b2266c 100644 --- a/tests/snowflake/test_extractor.py +++ b/tests/snowflake/test_extractor.py @@ -434,7 +434,7 @@ def test_fetch_hierarchy_system_tags(mock_connect: MagicMock): extractor._fetch_tags(mock_cursor) - assert dataset.system_tags is None + assert dataset.system_tags and dataset.system_tags.tags is not None assert extractor._hierarchies.get(dataset_normalized_name(table_name)) is not None db_hierarchy = extractor._hierarchies[dataset_normalized_name(table_name)] assert db_hierarchy.logical_id == HierarchyLogicalID( diff --git a/tests/tableau/expected.json b/tests/tableau/expected.json index 9de17c2a..48c6e2bd 100644 --- a/tests/tableau/expected.json +++ b/tests/tableau/expected.json @@ -170,48 +170,5 @@ ], "url": "https://10ax.online.tableau.com/#/site/abc/workbooks/123" } - }, - { - "logicalId": { - "name": "db.schema.table", - "platform": "BIGQUERY" - } - }, - { - "logicalId": { - "account": "snow", - "name": "dev_db.london.cycle", - "platform": "SNOWFLAKE" - }, - "systemTags": { - "tags": [ - { - "systemTagSource": "TABLEAU", - "value": "bar" - }, - { - "systemTagSource": "TABLEAU", - "value": "foo" - } - ] - } - }, - { - "logicalId": { - "name": "acme.berlin_bicycles.cycle_hire", - "platform": "REDSHIFT" - }, - "systemTags": { - "tags": [ - { - "systemTagSource": "TABLEAU", - "value": "bar" - }, - { - "systemTagSource": "TABLEAU", - "value": "foo" - } - ] - } } ] diff --git a/tests/tableau/test_extractor.py b/tests/tableau/test_extractor.py index bc49da5d..0ffaba4d 100644 --- a/tests/tableau/test_extractor.py +++ b/tests/tableau/test_extractor.py @@ -157,7 +157,6 @@ def test_parse_database_table(): database=None, schema=None, ), - system_tags=None, ) is None ) @@ -171,7 +170,6 @@ def test_parse_database_table(): database=Database(name="db", connectionType="invalid_type"), schema=None, ), - system_tags=None, ) is None ) @@ -185,7 +183,6 @@ def test_parse_database_table(): database=Database(name="db", connectionType="redshift"), schema="foo", ), - system_tags=None, ) == to_dataset_entity_id("db.schema.table", DataPlatform.REDSHIFT) # Full back to two-segment "name" @@ -197,7 +194,6 @@ def test_parse_database_table(): database=Database(name="db", connectionType="redshift"), schema="foo", ), - system_tags=None, ) == to_dataset_entity_id("db.schema.table", DataPlatform.REDSHIFT) # Test BigQuery project name => ID mapping @@ -218,7 +214,6 @@ def test_parse_database_table(): database=Database(name="bq_name", connectionType="bigquery"), schema="foo", ), - system_tags=None, ) == to_dataset_entity_id("bq_id.schema.table", DataPlatform.BIGQUERY)