diff --git a/metaphor/snowflake/config.py b/metaphor/snowflake/config.py index f7efa0b5..3400a6f6 100644 --- a/metaphor/snowflake/config.py +++ b/metaphor/snowflake/config.py @@ -55,6 +55,9 @@ class SnowflakeBaseConfig(SnowflakeAuthConfig): # Max number of concurrent queries to database max_concurrency: int = DEFAULT_THREAD_POOL_SIZE + # The fully qualified schema that contains all the account_usage views + account_usage_schema: str = "SNOWFLAKE.ACCOUNT_USAGE" + @dataclass(config=ConnectorConfig) class SnowflakeConfig(SnowflakeBaseConfig): @@ -70,6 +73,3 @@ class SnowflakeConfig(SnowflakeBaseConfig): streams: SnowflakeStreamsConfig = field( default_factory=lambda: SnowflakeStreamsConfig() ) - - # The fully qualified schema that contains all the account_usage views - account_usage_schema: str = "SNOWFLAKE.ACCOUNT_USAGE" diff --git a/metaphor/snowflake/extractor.py b/metaphor/snowflake/extractor.py index 34d4469b..6e5d19e5 100644 --- a/metaphor/snowflake/extractor.py +++ b/metaphor/snowflake/extractor.py @@ -581,11 +581,14 @@ def _fetch_query_logs(self) -> Iterator[QueryLog]: start_date = start_of_day(self._query_log_lookback_days) end_date = start_of_day() - has_access_history = check_access_history(self._conn) + has_access_history = check_access_history( + self._conn, self._account_usage_schema + ) logger.info(f"Using Snowflake Enterprise edition: {has_access_history}") count = fetch_query_history_count( self._conn, + self._account_usage_schema, start_date, self._query_log_excluded_usernames, end_date, diff --git a/metaphor/snowflake/lineage/extractor.py b/metaphor/snowflake/lineage/extractor.py index 779ff931..d6ddb731 100644 --- a/metaphor/snowflake/lineage/extractor.py +++ b/metaphor/snowflake/lineage/extractor.py @@ -59,6 +59,7 @@ def __init__(self, config: SnowflakeLineageRunConfig): self._enable_view_lineage = config.enable_view_lineage self._enable_lineage_from_history = config.enable_lineage_from_history self._include_self_lineage = config.include_self_lineage + self._account_usage_schema = config.account_usage_schema self._config = config self._datasets: Dict[str, Dataset] = {} @@ -76,10 +77,10 @@ async def extract(self) -> Collection[ENTITY_TYPES]: logger.info("Fetching access and query history") # Join QUERY_HISTORY & ACCESS_HISTORY to include only queries that succeeded. cursor.execute( - """ + f""" SELECT COUNT(*) - FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY q - JOIN SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY a + FROM {self._account_usage_schema}.QUERY_HISTORY q + JOIN {self._account_usage_schema}.ACCESS_HISTORY a ON q.QUERY_ID = a.QUERY_ID WHERE q.EXECUTION_STATUS = 'SUCCESS' AND ARRAY_SIZE(a.DIRECT_OBJECTS_ACCESSED) > 0 @@ -100,8 +101,8 @@ async def extract(self) -> Collection[ENTITY_TYPES]: str(x): QueryWithParam( f""" SELECT a.DIRECT_OBJECTS_ACCESSED, a.OBJECTS_MODIFIED, q.QUERY_TEXT - FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY q - JOIN SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY a + FROM {self._account_usage_schema}.QUERY_HISTORY q + JOIN {self._account_usage_schema}.ACCESS_HISTORY a ON q.QUERY_ID = a.QUERY_ID WHERE q.EXECUTION_STATUS = 'SUCCESS' AND ARRAY_SIZE(a.DIRECT_OBJECTS_ACCESSED) > 0 diff --git a/metaphor/snowflake/utils.py b/metaphor/snowflake/utils.py index 1c9e3045..69dd8fc3 100644 --- a/metaphor/snowflake/utils.py +++ b/metaphor/snowflake/utils.py @@ -156,15 +156,16 @@ def exclude_username_clause(excluded_usernames: Set[str]) -> str: def check_access_history( conn: SnowflakeConnection, + account_usage_schema: str, ) -> bool: """ Check if access history table is available """ cursor = conn.cursor() cursor.execute( - """ + f""" SELECT QUERY_ID - FROM SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY + FROM {account_usage_schema}.ACCESS_HISTORY LIMIT 1 """ ) @@ -174,6 +175,7 @@ def check_access_history( def fetch_query_history_count( conn: SnowflakeConnection, + account_usage_schema: str, start_date: datetime, excluded_usernames: Set[str], end_date: datetime = datetime.now(), @@ -187,8 +189,8 @@ def fetch_query_history_count( cursor.execute( f""" SELECT COUNT(1) - FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY q - JOIN SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY a + FROM {account_usage_schema}.QUERY_HISTORY q + JOIN {account_usage_schema}.ACCESS_HISTORY a ON a.QUERY_ID = q.QUERY_ID WHERE EXECUTION_STATUS = 'SUCCESS' and START_TIME > %s and START_TIME <= %s diff --git a/pyproject.toml b/pyproject.toml index 3b6d2cdc..2fa37e28 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.77" +version = "0.14.78" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "]