diff --git a/metaphor/snowflake/README.md b/metaphor/snowflake/README.md index 2a7a6ba9..9286524a 100644 --- a/metaphor/snowflake/README.md +++ b/metaphor/snowflake/README.md @@ -50,6 +50,10 @@ grant references on future materialized views in database identifier($db) to rol -- (Optional) Grant privilege to "show streams" grant select on all streams in database identifier($db) to role identifier($role); grant select on future streams in database identifier($db) to role identifier($role); + +-- (Optional) Grant privilege to "show iceberg tables" +grant select on all iceberg tables in database identifier($db) to role identifier($role); +grant select on future iceberg tables in database identifier($db) to role identifier($role); ``` ### Key Pair Authentication (Optional) diff --git a/metaphor/snowflake/extractor.py b/metaphor/snowflake/extractor.py index 6c81a6b8..d46cad1a 100644 --- a/metaphor/snowflake/extractor.py +++ b/metaphor/snowflake/extractor.py @@ -56,6 +56,8 @@ HierarchyLogicalID, QueryLog, SchemaType, + SnowflakeIcebergInfo, + SnowflakeIcebergTableType, SnowflakeStreamInfo, SourceInfo, SQLSchema, @@ -165,9 +167,10 @@ async def extract(self) -> Collection[ENTITY_TYPES]: f"Failed to fetch table extra info for '{database}'\n{e}" ) - if self._streams_enabled: - for schema in self._fetch_schemas(cursor): + for schema in self._fetch_schemas(cursor): + if self._streams_enabled: self._fetch_streams(cursor, database, schema) + self._fetch_iceberg_tables(cursor, database, schema) self._fetch_primary_keys(cursor) self._fetch_unique_keys(cursor) @@ -709,6 +712,49 @@ def _to_dataset_eid(x: str) -> str: logger.info(f"Found {count} stream tables in {database}.{schema}") + def _fetch_iceberg_tables( + self, cursor: SnowflakeCursor, database: str, schema: str + ): + try: + cursor.execute(f"SHOW ICEBERG TABLES IN {schema}") + except Exception: + # Most likely due to a permission issue + logger.exception(f"Failed to show iceberg tables in '{schema}'") + return + + count = 0 + for entry in cursor: + ( + table_name, + external_volume_name, + iceberg_table_type, + ) = ( + entry[1], + entry[5], + entry[7], + ) + + normalized_name = dataset_normalized_name(database, schema, table_name) + dataset = self._datasets.get(normalized_name) + + if dataset is None: + logger.warning(f"not able to find dataset, {normalized_name}") + continue + + dataset.snowflake_iceberg_info = SnowflakeIcebergInfo( + external_volume_name=external_volume_name, + iceberg_table_type=( + SnowflakeIcebergTableType.UNKNOWN + if iceberg_table_type + not in (item.value for item in SnowflakeIcebergTableType) + else SnowflakeIcebergTableType[iceberg_table_type] + ), + ) + + count += 1 + + logger.info(f"Found {count} iceberg tables in {database}.{schema}") + def _fetch_stream_row_count(self, stream_name) -> Optional[int]: with self._conn.cursor() as cursor: try: diff --git a/poetry.lock b/poetry.lock index 00a6046d..f3923bde 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -3258,13 +3258,13 @@ files = [ [[package]] name = "metaphor-models" -version = "0.40.1" +version = "0.40.2" description = "" optional = false python-versions = "<4.0,>=3.8" files = [ - {file = "metaphor_models-0.40.1-py3-none-any.whl", hash = "sha256:29944126e4d24cb03247c28e65877a60c9dc548e1a0f85181e7f957cc34396c3"}, - {file = "metaphor_models-0.40.1.tar.gz", hash = "sha256:6aeb102269e1b95795f722b1cbac12e58ab48d57b6065f1d1085c9c3d18f196f"}, + {file = "metaphor_models-0.40.2-py3-none-any.whl", hash = "sha256:159aca34cb3ce2936e8ef3a88a65f2890f74d098a2d164a2f97c16f9f7602bed"}, + {file = "metaphor_models-0.40.2.tar.gz", hash = "sha256:81d11a2a91cc63a1f50b099e7365ee83fa1ffb964dcbb46246b4565096bbc18b"}, ] [[package]] @@ -6748,4 +6748,4 @@ unity-catalog = ["databricks-sdk", "databricks-sql-connector", "sqlglot"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.12" -content-hash = "ddd5e59216c0baa2fc758bb969a9343dfb60df348d450557385c8c52c173a059" +content-hash = "02b2607b94c52edb3d0ef10578be05296ac0bd3d8efa14d2edf37fced349c38c" diff --git a/pyproject.toml b/pyproject.toml index 058fc5a1..41f4f70e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.111" +version = "0.14.112" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] @@ -42,7 +42,7 @@ llama-index-readers-confluence = { version = "^0.1.4", optional = true } llama-index-readers-notion = { version = "^0.1.6", optional = true } looker-sdk = { version = "^24.2.0", optional = true } lxml = { version = "~=5.0.0", optional = true } -metaphor-models = "0.40.1" +metaphor-models = "0.40.2" more-itertools = { version = "^10.1.0", optional = true } msal = { version = "^1.28.0", optional = true } msgraph-beta-sdk = { version = "~1.4.0", optional = true } diff --git a/tests/snowflake/test_extractor.py b/tests/snowflake/test_extractor.py index 33b2266c..85d5c750 100644 --- a/tests/snowflake/test_extractor.py +++ b/tests/snowflake/test_extractor.py @@ -14,6 +14,8 @@ MaterializationType, QueriedDataset, SchemaField, + SnowflakeIcebergInfo, + SnowflakeIcebergTableType, SnowflakeStreamInfo, SnowflakeStreamSourceType, SnowflakeStreamType, @@ -1178,3 +1180,46 @@ def test_fetch_schema_comment(mock_connect: MagicMock) -> None: assert hierarchy assert hierarchy.system_description.description == "desc" assert hierarchy.system_description.platform == AssetPlatform.SNOWFLAKE + + +@patch("metaphor.snowflake.auth.connect") +def test_fetch_iceberg_table(mock_connect: MagicMock) -> None: + mock_cursor = MagicMock() + + mock_cursor.__iter__.return_value = iter( + [ + ( + "dont care", + "TABLE", # stream_name, + "dont care", + "dont care", + "dont care", + "external_volume", # external_volume_name + "dont care", + "MANAGED", # table_type + ), + ] + ) + + extractor = SnowflakeExtractor(make_snowflake_config()) + extractor._datasets["db.schema.table"] = extractor._init_dataset( + database="DB", + schema="SCHEMA", + table="TABLE", + table_type="BASE TABLE", + comment="", + row_count=None, + table_bytes=None, + ) + + extractor._fetch_iceberg_tables(mock_cursor, "DB", "SCHEMA") + + normalized_name = dataset_normalized_name("DB", "SCHEMA", "TABLE") + assert normalized_name in extractor._datasets + assert len(extractor._datasets) == 1 + + dataset = extractor._datasets[normalized_name] + assert dataset.snowflake_iceberg_info == SnowflakeIcebergInfo( + iceberg_table_type=SnowflakeIcebergTableType.MANAGED, + external_volume_name="external_volume", + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 241383f3..01da2f33 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -16,7 +16,8 @@ def load_json(path, events: Optional[List[Dict[str, Any]]] = None): """ if events: with open(path, "w") as f: - f.write(json.dumps(events, indent=4)) + f.write(json.dumps(events, indent=2, sort_keys=True)) + f.write("\n") with open(path, "r") as f: return json.load(f)