Skip to content

Commit

Permalink
snowflake source info last updated missing [sc-29859] (#1048)
Browse files Browse the repository at this point in the history
  • Loading branch information
alyiwang authored Dec 3, 2024
1 parent a42a0d6 commit e7c3ad9
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 25 deletions.
60 changes: 40 additions & 20 deletions metaphor/snowflake/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
logger.info("Fetching metadata from Snowflake")

self._conn = auth.connect(self._config)
logger.info("Connected to Snowflake")

with self._conn:
cursor = self._conn.cursor()
Expand Down Expand Up @@ -333,6 +334,10 @@ def _fetch_table_info(
is_shared_database: bool,
secure_views: Set[str],
) -> None:
# shared database doesn't support getting DDL and last update time
if is_shared_database:
return

dict_cursor: DictCursor = self._conn.cursor(DictCursor) # type: ignore

# Partition table by schema
Expand All @@ -343,29 +348,30 @@ def _fetch_table_info(
for partitioned_tables in schema_tables.values():
for chunk in chunks(partitioned_tables, TABLE_INFO_FETCH_SIZE):
try:
self._fetch_table_info_internal(
dict_cursor, chunk, is_shared_database, secure_views
)
self._fetch_last_update_time(dict_cursor, chunk, secure_views)
except Exception as error:
logger.error(error)

try:
self._fetch_table_ddl(dict_cursor, chunk)
except Exception as error:
logger.error(error)

dict_cursor.close()

def _fetch_table_info_internal(
def _fetch_last_update_time(
self,
dict_cursor: DictCursor,
tables: List[Tuple[str, DatasetInfo]],
is_shared_database: bool,
secure_views: Set[str],
) -> None:
queries, params = [], []
ddl_tables, updated_time_tables = [], []
updated_time_tables = []
for normalized_name, table in tables:
fullname = to_quoted_identifier([table.database, table.schema, table.name])
# fetch last_update_time and DDL for tables, and fetch only DDL for views
# fetch last_update_time for tables
if (
table.type == SnowflakeTableType.BASE_TABLE.value
and not is_shared_database
and normalized_name not in secure_views
):
queries.append(
Expand All @@ -374,12 +380,6 @@ def _fetch_table_info_internal(
params.append(fullname)
updated_time_tables.append(normalized_name)

# shared database doesn't support getting DDL
if not is_shared_database:
queries.append(f"get_ddl('table', %s) as \"DDL_{normalized_name}\"")
params.append(fullname)
ddl_tables.append(normalized_name)

if not queries:
return
query = f"SELECT {','.join(queries)}"
Expand All @@ -389,12 +389,6 @@ def _fetch_table_info_internal(
results = dict_cursor.fetchone()
assert isinstance(results, Mapping)

for normalized_name in ddl_tables:
dataset = self._datasets[normalized_name]
assert dataset.schema is not None and dataset.schema.sql_schema is not None

dataset.schema.sql_schema.table_schema = results[f"DDL_{normalized_name}"]

for normalized_name in updated_time_tables:
dataset = self._datasets[normalized_name]
assert dataset.schema.sql_schema is not None
Expand All @@ -407,6 +401,32 @@ def _fetch_table_info_internal(
timestamp / 1_000_000_000
)

def _fetch_table_ddl(
self, dict_cursor: DictCursor, tables: List[Tuple[str, DatasetInfo]]
) -> None:
queries, params = [], []
ddl_tables = []
for normalized_name, table in tables:
fullname = to_quoted_identifier([table.database, table.schema, table.name])
queries.append(f"get_ddl('table', %s) as \"DDL_{normalized_name}\"")
params.append(fullname)
ddl_tables.append(normalized_name)

if not queries:
return
query = f"SELECT {','.join(queries)}"
logger.debug(f"{query}, params: {params}")

dict_cursor.execute(query, tuple(params))
results = dict_cursor.fetchone()
assert isinstance(results, Mapping)

for normalized_name in ddl_tables:
dataset = self._datasets[normalized_name]
assert dataset.schema is not None and dataset.schema.sql_schema is not None

dataset.schema.sql_schema.table_schema = results[f"DDL_{normalized_name}"]

def _fetch_unique_keys(self, cursor: SnowflakeCursor) -> None:
cursor.execute("SHOW UNIQUE KEYS")

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.159"
version = "0.14.160"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
12 changes: 8 additions & 4 deletions tests/snowflake/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,14 @@ def test_fetch_table_info(mock_connect: MagicMock):

mock_conn.cursor.return_value = mock_cursor

mock_cursor.fetchone.return_value = {
f"DDL_{normalized_name}": "ddl",
f"UPDATED_{normalized_name}": 1719327434000000000,
}
mock_cursor.fetchone.side_effect = [
{
f"UPDATED_{normalized_name}": 1719327434000000000,
},
{
f"DDL_{normalized_name}": "ddl",
},
]

extractor = SnowflakeExtractor(make_snowflake_config())

Expand Down

0 comments on commit e7c3ad9

Please sign in to comment.