Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract iceberg table #993

Merged
merged 10 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions metaphor/snowflake/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ grant references on future materialized views in database identifier($db) to rol
-- (Optional) Grant privilege to "show streams"
grant select on all streams in database identifier($db) to role identifier($role);
grant select on future streams in database identifier($db) to role identifier($role);

-- (Optional) Grant privilege to "show iceberg tables"
grant select on all iceberg tables in database identifier($db) to role identifier($role);
grant select on future iceberg tables in database identifier($db) to role identifier($role);
```

### Key Pair Authentication (Optional)
Expand Down
50 changes: 48 additions & 2 deletions metaphor/snowflake/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
HierarchyLogicalID,
QueryLog,
SchemaType,
SnowflakeIcebergInfo,
SnowflakeIcebergTableType,
SnowflakeStreamInfo,
SourceInfo,
SQLSchema,
Expand Down Expand Up @@ -165,9 +167,10 @@
f"Failed to fetch table extra info for '{database}'\n{e}"
)

if self._streams_enabled:
for schema in self._fetch_schemas(cursor):
for schema in self._fetch_schemas(cursor):
if self._streams_enabled:

Check warning on line 171 in metaphor/snowflake/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/snowflake/extractor.py#L170-L171

Added lines #L170 - L171 were not covered by tests
self._fetch_streams(cursor, database, schema)
self._fetch_iceberg_tables(cursor, database, schema)

Check warning on line 173 in metaphor/snowflake/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/snowflake/extractor.py#L173

Added line #L173 was not covered by tests

self._fetch_primary_keys(cursor)
self._fetch_unique_keys(cursor)
Expand Down Expand Up @@ -709,6 +712,49 @@

logger.info(f"Found {count} stream tables in {database}.{schema}")

def _fetch_iceberg_tables(
self, cursor: SnowflakeCursor, database: str, schema: str
):
try:
cursor.execute(f"SHOW ICEBERG TABLES IN {schema}")
except Exception:

Check warning on line 720 in metaphor/snowflake/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/snowflake/extractor.py#L720

Added line #L720 was not covered by tests
# Most likely due to a permission issue
logger.exception(f"Failed to show iceberg tables in '{schema}'")
return

Check warning on line 723 in metaphor/snowflake/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/snowflake/extractor.py#L722-L723

Added lines #L722 - L723 were not covered by tests

count = 0
for entry in cursor:
(
table_name,
external_volume_name,
iceberg_table_type,
) = (
entry[1],
elic-eon marked this conversation as resolved.
Show resolved Hide resolved
entry[5],
entry[7],
)

normalized_name = dataset_normalized_name(database, schema, table_name)
dataset = self._datasets.get(normalized_name)

if dataset is None:
logger.warning(f"not able to find dataset, {normalized_name}")
continue

Check warning on line 742 in metaphor/snowflake/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/snowflake/extractor.py#L741-L742

Added lines #L741 - L742 were not covered by tests

dataset.snowflake_iceberg_info = SnowflakeIcebergInfo(
external_volume_name=external_volume_name,
iceberg_table_type=(
SnowflakeIcebergTableType.UNKNOWN
if iceberg_table_type
not in (item.value for item in SnowflakeIcebergTableType)
else SnowflakeIcebergTableType[iceberg_table_type]
),
)

count += 1

logger.info(f"Found {count} iceberg tables in {database}.{schema}")

def _fetch_stream_row_count(self, stream_name) -> Optional[int]:
with self._conn.cursor() as cursor:
try:
Expand Down
10 changes: 5 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.111"
version = "0.14.112"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down Expand Up @@ -42,7 +42,7 @@ llama-index-readers-confluence = { version = "^0.1.4", optional = true }
llama-index-readers-notion = { version = "^0.1.6", optional = true }
looker-sdk = { version = "^24.2.0", optional = true }
lxml = { version = "~=5.0.0", optional = true }
metaphor-models = "0.40.1"
metaphor-models = "0.40.2"
more-itertools = { version = "^10.1.0", optional = true }
msal = { version = "^1.28.0", optional = true }
msgraph-beta-sdk = { version = "~1.4.0", optional = true }
Expand Down
45 changes: 45 additions & 0 deletions tests/snowflake/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
MaterializationType,
QueriedDataset,
SchemaField,
SnowflakeIcebergInfo,
SnowflakeIcebergTableType,
SnowflakeStreamInfo,
SnowflakeStreamSourceType,
SnowflakeStreamType,
Expand Down Expand Up @@ -1178,3 +1180,46 @@ def test_fetch_schema_comment(mock_connect: MagicMock) -> None:
assert hierarchy
assert hierarchy.system_description.description == "desc"
assert hierarchy.system_description.platform == AssetPlatform.SNOWFLAKE


@patch("metaphor.snowflake.auth.connect")
def test_fetch_iceberg_table(mock_connect: MagicMock) -> None:
mock_cursor = MagicMock()

mock_cursor.__iter__.return_value = iter(
[
(
"dont care",
"TABLE", # stream_name,
"dont care",
"dont care",
"dont care",
"external_volume", # external_volume_name
"dont care",
"MANAGED", # table_type
),
]
)

extractor = SnowflakeExtractor(make_snowflake_config())
extractor._datasets["db.schema.table"] = extractor._init_dataset(
database="DB",
schema="SCHEMA",
table="TABLE",
table_type="BASE TABLE",
comment="",
row_count=None,
table_bytes=None,
)

extractor._fetch_iceberg_tables(mock_cursor, "DB", "SCHEMA")

normalized_name = dataset_normalized_name("DB", "SCHEMA", "TABLE")
assert normalized_name in extractor._datasets
assert len(extractor._datasets) == 1

dataset = extractor._datasets[normalized_name]
assert dataset.snowflake_iceberg_info == SnowflakeIcebergInfo(
iceberg_table_type=SnowflakeIcebergTableType.MANAGED,
external_volume_name="external_volume",
)
3 changes: 2 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def load_json(path, events: Optional[List[Dict[str, Any]]] = None):
"""
if events:
with open(path, "w") as f:
f.write(json.dumps(events, indent=4))
f.write(json.dumps(events, indent=2, sort_keys=True))
f.write("\n")
with open(path, "r") as f:
return json.load(f)

Expand Down
Loading