Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sc-27944] Do not populate dataset in Tableau crawler #979

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metaphor/snowflake/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,8 @@ def _init_dataset(
database=database, schema=schema, table=table
)

dataset.system_tags = SystemTags(tags=[])

return dataset

@staticmethod
Expand Down
13 changes: 5 additions & 8 deletions metaphor/snowflake/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
SnowflakeStreamSourceType,
SnowflakeStreamType,
SystemTag,
SystemTags,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -240,13 +239,11 @@ def _update_field_system_tag(field: SchemaField, system_tag: SystemTag) -> None:


def append_dataset_system_tag(dataset: Dataset, system_tag: SystemTag) -> None:
assert dataset.schema is not None

if dataset.system_tags is None:
dataset.system_tags = SystemTags()
if dataset.system_tags.tags is None:
dataset.system_tags.tags = []

assert (
dataset.schema is not None
and dataset.system_tags
and dataset.system_tags.tags is not None
)
# Always override exisiting tag, since we process database tags first, then schema tags and
# then finally table tags
other_tags = [t for t in dataset.system_tags.tags if t.key != system_tag.key]
Expand Down
39 changes: 4 additions & 35 deletions metaphor/tableau/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from metaphor.common.entity_id import (
EntityId,
to_dataset_entity_id,
to_dataset_entity_id_from_logical_id,
to_virtual_view_entity_id,
)
from metaphor.common.event_util import ENTITY_TYPES
Expand All @@ -31,8 +30,6 @@
DashboardLogicalID,
DashboardPlatform,
DataPlatform,
Dataset,
DatasetLogicalID,
EntityUpstream,
SourceInfo,
SystemContact,
Expand Down Expand Up @@ -99,7 +96,6 @@ def __init__(self, config: TableauRunConfig):

self._views: Dict[str, tableau.ViewItem] = {}
self._projects: Dict[str, List[str]] = {} # project id -> project hierarchy
self._datasets: Dict[EntityId, Dataset] = {}
self._virtual_views: Dict[str, VirtualView] = {}
self._dashboards: Dict[str, Dashboard] = {}

Expand Down Expand Up @@ -144,7 +140,6 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
return [
*self._dashboards.values(),
*self._virtual_views.values(),
*self._datasets.values(),
]

def _extract_dashboards(
Expand Down Expand Up @@ -374,7 +369,6 @@ def _parse_custom_sql_table(
logger.warning(f"Ignore non-fully qualified source table {fullname}")
continue

self._init_dataset(fullname, platform, account, None)
upstream_datasets.append(
str(to_dataset_entity_id(fullname, platform, account))
)
Expand Down Expand Up @@ -432,9 +426,7 @@ def _parse_workbook_query_response(
# if source_datasets is None or empty from custom SQL, use the upstreamTables of the datasource
source_datasets = (
custom_sql_source.sources if custom_sql_source else None
) or self._parse_upstream_datasets(
published_source.upstreamTables, system_tags
)
) or self._parse_upstream_datasets(published_source.upstreamTables)

full_name, structure = self._build_asset_full_name_and_structure(
published_source.name,
Expand Down Expand Up @@ -498,9 +490,7 @@ def _parse_workbook_query_response(
# if source_datasets is None or empty from custom SQL, use the upstreamTables of the datasource
source_datasets = (
custom_sql_source.sources if custom_sql_source else None
) or self._parse_upstream_datasets(
embedded_source.upstreamTables, system_tags
)
) or self._parse_upstream_datasets(embedded_source.upstreamTables)

self._virtual_views[embedded_source.id] = VirtualView(
logical_id=VirtualViewLogicalID(
Expand Down Expand Up @@ -545,11 +535,8 @@ def _parse_workbook_query_response(
def _parse_upstream_datasets(
self,
upstreamTables: List[DatabaseTable],
system_tags: Optional[SystemTags],
) -> List[str]:
upstream_datasets = [
self._parse_dataset_id(table, system_tags) for table in upstreamTables
]
upstream_datasets = [self._parse_dataset_id(table) for table in upstreamTables]
return list(
set(
[
Expand All @@ -560,9 +547,7 @@ def _parse_upstream_datasets(
)
)

def _parse_dataset_id(
self, table: DatabaseTable, system_tags: Optional[SystemTags]
) -> Optional[EntityId]:
def _parse_dataset_id(self, table: DatabaseTable) -> Optional[EntityId]:
if (
not table.name
or not table.schema_
Expand Down Expand Up @@ -613,7 +598,6 @@ def _parse_dataset_id(
)

logger.debug(f"dataset id: {fullname} {connection_type} {account}")
self._init_dataset(fullname, platform, account, system_tags)
return to_dataset_entity_id(fullname, platform, account)

def _parse_chart(self, view: tableau.ViewItem) -> Chart:
Expand All @@ -636,21 +620,6 @@ def _parse_chart(self, view: tableau.ViewItem) -> Chart:

_workbook_url_regex = r".+\/workbooks\/(\d+)(\/.*)?"

def _init_dataset(
self,
normalized_name: str,
platform: DataPlatform,
account: Optional[str],
system_tags: Optional[SystemTags],
) -> Dataset:
dataset = Dataset()
dataset.logical_id = DatasetLogicalID(
name=normalized_name, account=account, platform=platform
)
dataset.system_tags = system_tags
entity_id = to_dataset_entity_id_from_logical_id(dataset.logical_id)
return self._datasets.setdefault(entity_id, dataset)

@staticmethod
def _extract_workbook_id(workbook_url: str) -> str:
"""Extracts the workbook ID from a workbook URL"""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.100"
version = "0.14.101"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
2 changes: 1 addition & 1 deletion tests/snowflake/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def test_fetch_hierarchy_system_tags(mock_connect: MagicMock):

extractor._fetch_tags(mock_cursor)

assert dataset.system_tags is None
assert dataset.system_tags and dataset.system_tags.tags is not None
assert extractor._hierarchies.get(dataset_normalized_name(table_name)) is not None
db_hierarchy = extractor._hierarchies[dataset_normalized_name(table_name)]
assert db_hierarchy.logical_id == HierarchyLogicalID(
Expand Down
43 changes: 0 additions & 43 deletions tests/tableau/expected.json
Original file line number Diff line number Diff line change
Expand Up @@ -170,48 +170,5 @@
],
"url": "https://10ax.online.tableau.com/#/site/abc/workbooks/123"
}
},
{
"logicalId": {
"name": "db.schema.table",
"platform": "BIGQUERY"
}
},
{
"logicalId": {
"account": "snow",
"name": "dev_db.london.cycle",
"platform": "SNOWFLAKE"
},
"systemTags": {
"tags": [
{
"systemTagSource": "TABLEAU",
"value": "bar"
},
{
"systemTagSource": "TABLEAU",
"value": "foo"
}
]
}
},
{
"logicalId": {
"name": "acme.berlin_bicycles.cycle_hire",
"platform": "REDSHIFT"
},
"systemTags": {
"tags": [
{
"systemTagSource": "TABLEAU",
"value": "bar"
},
{
"systemTagSource": "TABLEAU",
"value": "foo"
}
]
}
}
]
5 changes: 0 additions & 5 deletions tests/tableau/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ def test_parse_database_table():
database=None,
schema=None,
),
system_tags=None,
)
is None
)
Expand All @@ -171,7 +170,6 @@ def test_parse_database_table():
database=Database(name="db", connectionType="invalid_type"),
schema=None,
),
system_tags=None,
)
is None
)
Expand All @@ -185,7 +183,6 @@ def test_parse_database_table():
database=Database(name="db", connectionType="redshift"),
schema="foo",
),
system_tags=None,
) == to_dataset_entity_id("db.schema.table", DataPlatform.REDSHIFT)

# Full back to two-segment "name"
Expand All @@ -197,7 +194,6 @@ def test_parse_database_table():
database=Database(name="db", connectionType="redshift"),
schema="foo",
),
system_tags=None,
) == to_dataset_entity_id("db.schema.table", DataPlatform.REDSHIFT)

# Test BigQuery project name => ID mapping
Expand All @@ -218,7 +214,6 @@ def test_parse_database_table():
database=Database(name="bq_name", connectionType="bigquery"),
schema="foo",
),
system_tags=None,
) == to_dataset_entity_id("bq_id.schema.table", DataPlatform.BIGQUERY)


Expand Down
Loading