Skip to content

Commit

Permalink
Extract Snowflake database/schema level comment (#863)
Browse files Browse the repository at this point in the history
* Fetch comment for DB/schema

* Add tests

* Bump version

* Update deps
  • Loading branch information
elic-eon authored May 20, 2024
1 parent 0964991 commit 7a69a4e
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 15 deletions.
73 changes: 64 additions & 9 deletions metaphor/snowflake/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from metaphor.common.utils import chunks, safe_float, start_of_day
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
AssetPlatform,
DataPlatform,
Dataset,
DatasetLogicalID,
Expand All @@ -54,6 +55,7 @@
SnowflakeStreamInfo,
SourceInfo,
SQLSchema,
SystemDescription,
SystemTag,
SystemTags,
SystemTagSource,
Expand Down Expand Up @@ -141,6 +143,9 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
logger.info(f"Skip empty database {database}")
continue

self._fetch_database_comment(cursor)
self._fetch_schemas_comment(cursor)

logger.info(f"Include {len(tables)} tables from {database}")

self._fetch_columns(cursor, database)
Expand Down Expand Up @@ -487,18 +492,22 @@ def _add_hierarchy_system_tag(
hierarchy_key = dataset_normalized_name(database, object_name)
logical_id = HierarchyLogicalID(path=path)

self._hierarchies.setdefault(
hierarchy = self._hierarchies.setdefault(
hierarchy_key,
Hierarchy(
logical_id=logical_id, system_tags=SystemTags(tags=[])
), # SystemTags.tags should never be empty
).system_tags.tags.append(
SystemTag(
key=tag_key,
system_tag_source=SystemTagSource.SNOWFLAKE,
value=tag_value,
)
logical_id=logical_id,
),
)

system_tag = SystemTag(
key=tag_key,
system_tag_source=SystemTagSource.SNOWFLAKE,
value=tag_value,
)
if hierarchy.system_tags:
hierarchy.system_tags.tags.append(system_tag)
else:
hierarchy.system_tags = SystemTags(tags=[system_tag])

def _fetch_tags(self, cursor: SnowflakeCursor) -> None:
query = """
Expand Down Expand Up @@ -949,3 +958,49 @@ def _parse_accessed_objects(self, raw_objects: str) -> List[QueriedDataset]:
)

return queried_datasets

def _add_hierarchy_comment(
self,
database: str,
comment: str,
schema: Optional[str] = None,
) -> None:
"""
Adds a hierarchy description (i.e. a database-level or schema-level tag) to MCE.
"""

if not comment:
return

# Set hierarchy key and logical_id
path = [DataPlatform.SNOWFLAKE.value, database.lower()] + (
[schema.lower()] if schema else []
)
logical_id = HierarchyLogicalID(path=path)
hierarchy_key = dataset_normalized_name(database, schema)

hierarchy = self._hierarchies.setdefault(
hierarchy_key,
Hierarchy(
logical_id=logical_id,
),
)
hierarchy.system_description = SystemDescription(
platform=AssetPlatform.SNOWFLAKE, description=comment
)

def _fetch_database_comment(self, cursor: SnowflakeCursor) -> None:
cursor.execute(
"SELECT database_name, comment FROM information_schema.databases"
)

for database_name, comment in cursor:
self._add_hierarchy_comment(database_name, comment)

def _fetch_schemas_comment(self, cursor: SnowflakeCursor) -> None:
cursor.execute(
"SELECT catalog_name, schema_name, comment FROM information_schema.schemata WHERE schema_name != 'INFORMATION_SCHEMA'"
)

for database, schema, comment in cursor:
self._add_hierarchy_comment(database, schema=schema, comment=comment)
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.5"
version = "0.14.6"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down Expand Up @@ -41,7 +41,7 @@ llama-index-readers-confluence = { version = "^0.1.4", optional = true }
llama-index-readers-notion = { version = "^0.1.6", optional = true }
looker-sdk = { version = "^24.2.0", optional = true }
lxml = { version = "~=5.0.0", optional = true }
metaphor-models = "0.34.1"
metaphor-models = "0.35.1"
more-itertools = { version = "^10.1.0", optional = true }
msal = { version = "^1.28.0", optional = true }
msgraph-beta-sdk = { version = "1.2.0", optional = true }
Expand Down
41 changes: 41 additions & 0 deletions tests/snowflake/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from metaphor.common.entity_id import dataset_normalized_name
from metaphor.common.filter import DatasetFilter
from metaphor.models.metadata_change_event import (
AssetPlatform,
DataPlatform,
DatasetLogicalID,
HierarchyLogicalID,
Expand Down Expand Up @@ -1137,3 +1138,43 @@ def test_fetch_tags_override(mock_connect: MagicMock) -> None:
)
assert field
assert field.tags == ["TEST_TAG=db_tag"]


@patch("metaphor.snowflake.auth.connect")
def test_fetch_database_comment(mock_connect: MagicMock) -> None:
mock_cursor = MagicMock()

mock_cursor.__iter__.return_value = iter(
[
("DATABASE", "desc"),
]
)

extractor = SnowflakeExtractor(make_snowflake_config())

extractor._fetch_database_comment(mock_cursor)
assert len(extractor._hierarchies) == 1
hierarchy = extractor._hierarchies.get("database")
assert hierarchy
assert hierarchy.system_description.description == "desc"
assert hierarchy.system_description.platform == AssetPlatform.SNOWFLAKE


@patch("metaphor.snowflake.auth.connect")
def test_fetch_schema_comment(mock_connect: MagicMock) -> None:
mock_cursor = MagicMock()

mock_cursor.__iter__.return_value = iter(
[
("DATABASE", "SCHEMA", "desc"),
]
)

extractor = SnowflakeExtractor(make_snowflake_config())

extractor._fetch_schemas_comment(mock_cursor)
assert len(extractor._hierarchies) == 1
hierarchy = extractor._hierarchies.get("database.schema")
assert hierarchy
assert hierarchy.system_description.description == "desc"
assert hierarchy.system_description.platform == AssetPlatform.SNOWFLAKE

0 comments on commit 7a69a4e

Please sign in to comment.