From e7c3ad9a3e1914f3eb0a23c3a58eeba3fd80ac92 Mon Sep 17 00:00:00 2001 From: "Yi (Alan) Wang" Date: Tue, 3 Dec 2024 06:27:33 -0800 Subject: [PATCH] snowflake source info last updated missing [sc-29859] (#1048) --- metaphor/snowflake/extractor.py | 60 ++++++++++++++++++++----------- pyproject.toml | 2 +- tests/snowflake/test_extractor.py | 12 ++++--- 3 files changed, 49 insertions(+), 25 deletions(-) diff --git a/metaphor/snowflake/extractor.py b/metaphor/snowflake/extractor.py index 534e5056..59938f61 100644 --- a/metaphor/snowflake/extractor.py +++ b/metaphor/snowflake/extractor.py @@ -137,6 +137,7 @@ async def extract(self) -> Collection[ENTITY_TYPES]: logger.info("Fetching metadata from Snowflake") self._conn = auth.connect(self._config) + logger.info("Connected to Snowflake") with self._conn: cursor = self._conn.cursor() @@ -333,6 +334,10 @@ def _fetch_table_info( is_shared_database: bool, secure_views: Set[str], ) -> None: + # shared database doesn't support getting DDL and last update time + if is_shared_database: + return + dict_cursor: DictCursor = self._conn.cursor(DictCursor) # type: ignore # Partition table by schema @@ -343,29 +348,30 @@ def _fetch_table_info( for partitioned_tables in schema_tables.values(): for chunk in chunks(partitioned_tables, TABLE_INFO_FETCH_SIZE): try: - self._fetch_table_info_internal( - dict_cursor, chunk, is_shared_database, secure_views - ) + self._fetch_last_update_time(dict_cursor, chunk, secure_views) + except Exception as error: + logger.error(error) + + try: + self._fetch_table_ddl(dict_cursor, chunk) except Exception as error: logger.error(error) dict_cursor.close() - def _fetch_table_info_internal( + def _fetch_last_update_time( self, dict_cursor: DictCursor, tables: List[Tuple[str, DatasetInfo]], - is_shared_database: bool, secure_views: Set[str], ) -> None: queries, params = [], [] - ddl_tables, updated_time_tables = [], [] + updated_time_tables = [] for normalized_name, table in tables: fullname = to_quoted_identifier([table.database, table.schema, table.name]) - # fetch last_update_time and DDL for tables, and fetch only DDL for views + # fetch last_update_time for tables if ( table.type == SnowflakeTableType.BASE_TABLE.value - and not is_shared_database and normalized_name not in secure_views ): queries.append( @@ -374,12 +380,6 @@ def _fetch_table_info_internal( params.append(fullname) updated_time_tables.append(normalized_name) - # shared database doesn't support getting DDL - if not is_shared_database: - queries.append(f"get_ddl('table', %s) as \"DDL_{normalized_name}\"") - params.append(fullname) - ddl_tables.append(normalized_name) - if not queries: return query = f"SELECT {','.join(queries)}" @@ -389,12 +389,6 @@ def _fetch_table_info_internal( results = dict_cursor.fetchone() assert isinstance(results, Mapping) - for normalized_name in ddl_tables: - dataset = self._datasets[normalized_name] - assert dataset.schema is not None and dataset.schema.sql_schema is not None - - dataset.schema.sql_schema.table_schema = results[f"DDL_{normalized_name}"] - for normalized_name in updated_time_tables: dataset = self._datasets[normalized_name] assert dataset.schema.sql_schema is not None @@ -407,6 +401,32 @@ def _fetch_table_info_internal( timestamp / 1_000_000_000 ) + def _fetch_table_ddl( + self, dict_cursor: DictCursor, tables: List[Tuple[str, DatasetInfo]] + ) -> None: + queries, params = [], [] + ddl_tables = [] + for normalized_name, table in tables: + fullname = to_quoted_identifier([table.database, table.schema, table.name]) + queries.append(f"get_ddl('table', %s) as \"DDL_{normalized_name}\"") + params.append(fullname) + ddl_tables.append(normalized_name) + + if not queries: + return + query = f"SELECT {','.join(queries)}" + logger.debug(f"{query}, params: {params}") + + dict_cursor.execute(query, tuple(params)) + results = dict_cursor.fetchone() + assert isinstance(results, Mapping) + + for normalized_name in ddl_tables: + dataset = self._datasets[normalized_name] + assert dataset.schema is not None and dataset.schema.sql_schema is not None + + dataset.schema.sql_schema.table_schema = results[f"DDL_{normalized_name}"] + def _fetch_unique_keys(self, cursor: SnowflakeCursor) -> None: cursor.execute("SHOW UNIQUE KEYS") diff --git a/pyproject.toml b/pyproject.toml index e62c221b..5481410b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.159" +version = "0.14.160" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/snowflake/test_extractor.py b/tests/snowflake/test_extractor.py index 85adeaaa..66866c80 100644 --- a/tests/snowflake/test_extractor.py +++ b/tests/snowflake/test_extractor.py @@ -207,10 +207,14 @@ def test_fetch_table_info(mock_connect: MagicMock): mock_conn.cursor.return_value = mock_cursor - mock_cursor.fetchone.return_value = { - f"DDL_{normalized_name}": "ddl", - f"UPDATED_{normalized_name}": 1719327434000000000, - } + mock_cursor.fetchone.side_effect = [ + { + f"UPDATED_{normalized_name}": 1719327434000000000, + }, + { + f"DDL_{normalized_name}": "ddl", + }, + ] extractor = SnowflakeExtractor(make_snowflake_config())