Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snowflake source info last updated missing [sc-29859] #1048

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 40 additions & 20 deletions metaphor/snowflake/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
logger.info("Fetching metadata from Snowflake")

self._conn = auth.connect(self._config)
logger.info("Connected to Snowflake")

Check warning on line 140 in metaphor/snowflake/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/snowflake/extractor.py#L140

Added line #L140 was not covered by tests

with self._conn:
cursor = self._conn.cursor()
Expand Down Expand Up @@ -333,6 +334,10 @@
is_shared_database: bool,
secure_views: Set[str],
) -> None:
# shared database doesn't support getting DDL and last update time
if is_shared_database:
return

Check warning on line 339 in metaphor/snowflake/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/snowflake/extractor.py#L339

Added line #L339 was not covered by tests

dict_cursor: DictCursor = self._conn.cursor(DictCursor) # type: ignore

# Partition table by schema
Expand All @@ -343,29 +348,30 @@
for partitioned_tables in schema_tables.values():
for chunk in chunks(partitioned_tables, TABLE_INFO_FETCH_SIZE):
try:
self._fetch_table_info_internal(
dict_cursor, chunk, is_shared_database, secure_views
)
self._fetch_last_update_time(dict_cursor, chunk, secure_views)
except Exception as error:
logger.error(error)

try:
self._fetch_table_ddl(dict_cursor, chunk)
except Exception as error:
logger.error(error)

dict_cursor.close()

def _fetch_table_info_internal(
def _fetch_last_update_time(
self,
dict_cursor: DictCursor,
tables: List[Tuple[str, DatasetInfo]],
is_shared_database: bool,
secure_views: Set[str],
) -> None:
queries, params = [], []
ddl_tables, updated_time_tables = [], []
updated_time_tables = []
for normalized_name, table in tables:
fullname = to_quoted_identifier([table.database, table.schema, table.name])
# fetch last_update_time and DDL for tables, and fetch only DDL for views
# fetch last_update_time for tables
if (
table.type == SnowflakeTableType.BASE_TABLE.value
and not is_shared_database
and normalized_name not in secure_views
):
queries.append(
Expand All @@ -374,12 +380,6 @@
params.append(fullname)
updated_time_tables.append(normalized_name)

# shared database doesn't support getting DDL
if not is_shared_database:
queries.append(f"get_ddl('table', %s) as \"DDL_{normalized_name}\"")
params.append(fullname)
ddl_tables.append(normalized_name)

if not queries:
return
query = f"SELECT {','.join(queries)}"
Expand All @@ -389,12 +389,6 @@
results = dict_cursor.fetchone()
assert isinstance(results, Mapping)

for normalized_name in ddl_tables:
dataset = self._datasets[normalized_name]
assert dataset.schema is not None and dataset.schema.sql_schema is not None

dataset.schema.sql_schema.table_schema = results[f"DDL_{normalized_name}"]

for normalized_name in updated_time_tables:
dataset = self._datasets[normalized_name]
assert dataset.schema.sql_schema is not None
Expand All @@ -407,6 +401,32 @@
timestamp / 1_000_000_000
)

def _fetch_table_ddl(
self, dict_cursor: DictCursor, tables: List[Tuple[str, DatasetInfo]]
) -> None:
queries, params = [], []
ddl_tables = []
for normalized_name, table in tables:
fullname = to_quoted_identifier([table.database, table.schema, table.name])
queries.append(f"get_ddl('table', %s) as \"DDL_{normalized_name}\"")
params.append(fullname)
ddl_tables.append(normalized_name)

if not queries:
return

Check warning on line 416 in metaphor/snowflake/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/snowflake/extractor.py#L416

Added line #L416 was not covered by tests
query = f"SELECT {','.join(queries)}"
logger.debug(f"{query}, params: {params}")

dict_cursor.execute(query, tuple(params))
results = dict_cursor.fetchone()
assert isinstance(results, Mapping)

for normalized_name in ddl_tables:
dataset = self._datasets[normalized_name]
assert dataset.schema is not None and dataset.schema.sql_schema is not None

dataset.schema.sql_schema.table_schema = results[f"DDL_{normalized_name}"]

def _fetch_unique_keys(self, cursor: SnowflakeCursor) -> None:
cursor.execute("SHOW UNIQUE KEYS")

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.159"
version = "0.14.160"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
12 changes: 8 additions & 4 deletions tests/snowflake/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,14 @@ def test_fetch_table_info(mock_connect: MagicMock):

mock_conn.cursor.return_value = mock_cursor

mock_cursor.fetchone.return_value = {
f"DDL_{normalized_name}": "ddl",
f"UPDATED_{normalized_name}": 1719327434000000000,
}
mock_cursor.fetchone.side_effect = [
{
f"UPDATED_{normalized_name}": 1719327434000000000,
},
{
f"DDL_{normalized_name}": "ddl",
},
]

extractor = SnowflakeExtractor(make_snowflake_config())

Expand Down
Loading