Skip to content

Commit

Permalink
Extract iceberg table (#993)
Browse files Browse the repository at this point in the history
* Extract iceberg table

* Bump version

* Add test

* Format expected json

* Bump version

* Revert expected

* Fix bad merge
  • Loading branch information
elic-eon authored Oct 2, 2024
1 parent 460066e commit 00d72f9
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 10 deletions.
4 changes: 4 additions & 0 deletions metaphor/snowflake/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ grant references on future materialized views in database identifier($db) to rol
-- (Optional) Grant privilege to "show streams"
grant select on all streams in database identifier($db) to role identifier($role);
grant select on future streams in database identifier($db) to role identifier($role);

-- (Optional) Grant privilege to "show iceberg tables"
grant select on all iceberg tables in database identifier($db) to role identifier($role);
grant select on future iceberg tables in database identifier($db) to role identifier($role);
```

### Key Pair Authentication (Optional)
Expand Down
50 changes: 48 additions & 2 deletions metaphor/snowflake/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
HierarchyLogicalID,
QueryLog,
SchemaType,
SnowflakeIcebergInfo,
SnowflakeIcebergTableType,
SnowflakeStreamInfo,
SourceInfo,
SQLSchema,
Expand Down Expand Up @@ -165,9 +167,10 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
f"Failed to fetch table extra info for '{database}'\n{e}"
)

if self._streams_enabled:
for schema in self._fetch_schemas(cursor):
for schema in self._fetch_schemas(cursor):
if self._streams_enabled:
self._fetch_streams(cursor, database, schema)
self._fetch_iceberg_tables(cursor, database, schema)

self._fetch_primary_keys(cursor)
self._fetch_unique_keys(cursor)
Expand Down Expand Up @@ -709,6 +712,49 @@ def _to_dataset_eid(x: str) -> str:

logger.info(f"Found {count} stream tables in {database}.{schema}")

def _fetch_iceberg_tables(
self, cursor: SnowflakeCursor, database: str, schema: str
):
try:
cursor.execute(f"SHOW ICEBERG TABLES IN {schema}")
except Exception:
# Most likely due to a permission issue
logger.exception(f"Failed to show iceberg tables in '{schema}'")
return

count = 0
for entry in cursor:
(
table_name,
external_volume_name,
iceberg_table_type,
) = (
entry[1],
entry[5],
entry[7],
)

normalized_name = dataset_normalized_name(database, schema, table_name)
dataset = self._datasets.get(normalized_name)

if dataset is None:
logger.warning(f"not able to find dataset, {normalized_name}")
continue

dataset.snowflake_iceberg_info = SnowflakeIcebergInfo(
external_volume_name=external_volume_name,
iceberg_table_type=(
SnowflakeIcebergTableType.UNKNOWN
if iceberg_table_type
not in (item.value for item in SnowflakeIcebergTableType)
else SnowflakeIcebergTableType[iceberg_table_type]
),
)

count += 1

logger.info(f"Found {count} iceberg tables in {database}.{schema}")

def _fetch_stream_row_count(self, stream_name) -> Optional[int]:
with self._conn.cursor() as cursor:
try:
Expand Down
10 changes: 5 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.111"
version = "0.14.112"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down Expand Up @@ -42,7 +42,7 @@ llama-index-readers-confluence = { version = "^0.1.4", optional = true }
llama-index-readers-notion = { version = "^0.1.6", optional = true }
looker-sdk = { version = "^24.2.0", optional = true }
lxml = { version = "~=5.0.0", optional = true }
metaphor-models = "0.40.1"
metaphor-models = "0.40.2"
more-itertools = { version = "^10.1.0", optional = true }
msal = { version = "^1.28.0", optional = true }
msgraph-beta-sdk = { version = "~1.4.0", optional = true }
Expand Down
45 changes: 45 additions & 0 deletions tests/snowflake/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
MaterializationType,
QueriedDataset,
SchemaField,
SnowflakeIcebergInfo,
SnowflakeIcebergTableType,
SnowflakeStreamInfo,
SnowflakeStreamSourceType,
SnowflakeStreamType,
Expand Down Expand Up @@ -1178,3 +1180,46 @@ def test_fetch_schema_comment(mock_connect: MagicMock) -> None:
assert hierarchy
assert hierarchy.system_description.description == "desc"
assert hierarchy.system_description.platform == AssetPlatform.SNOWFLAKE


@patch("metaphor.snowflake.auth.connect")
def test_fetch_iceberg_table(mock_connect: MagicMock) -> None:
mock_cursor = MagicMock()

mock_cursor.__iter__.return_value = iter(
[
(
"dont care",
"TABLE", # stream_name,
"dont care",
"dont care",
"dont care",
"external_volume", # external_volume_name
"dont care",
"MANAGED", # table_type
),
]
)

extractor = SnowflakeExtractor(make_snowflake_config())
extractor._datasets["db.schema.table"] = extractor._init_dataset(
database="DB",
schema="SCHEMA",
table="TABLE",
table_type="BASE TABLE",
comment="",
row_count=None,
table_bytes=None,
)

extractor._fetch_iceberg_tables(mock_cursor, "DB", "SCHEMA")

normalized_name = dataset_normalized_name("DB", "SCHEMA", "TABLE")
assert normalized_name in extractor._datasets
assert len(extractor._datasets) == 1

dataset = extractor._datasets[normalized_name]
assert dataset.snowflake_iceberg_info == SnowflakeIcebergInfo(
iceberg_table_type=SnowflakeIcebergTableType.MANAGED,
external_volume_name="external_volume",
)
3 changes: 2 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def load_json(path, events: Optional[List[Dict[str, Any]]] = None):
"""
if events:
with open(path, "w") as f:
f.write(json.dumps(events, indent=4))
f.write(json.dumps(events, indent=2, sort_keys=True))
f.write("\n")
with open(path, "r") as f:
return json.load(f)

Expand Down

0 comments on commit 00d72f9

Please sign in to comment.