From a7d41df76ebdb57869df24de6efa92cbb1f714fd Mon Sep 17 00:00:00 2001 From: Mars Lan Date: Sun, 4 Aug 2024 05:30:56 -0700 Subject: [PATCH] Add account_usage config to query a alternate schema --- metaphor/snowflake/README.md | 8 ++++++++ metaphor/snowflake/config.py | 3 +++ metaphor/snowflake/extractor.py | 11 ++++++----- pyproject.toml | 2 +- tests/snowflake/config.yml | 1 + tests/snowflake/test_config.py | 1 + 6 files changed, 20 insertions(+), 6 deletions(-) diff --git a/metaphor/snowflake/README.md b/metaphor/snowflake/README.md index 0ad0f9ff..6225deb9 100644 --- a/metaphor/snowflake/README.md +++ b/metaphor/snowflake/README.md @@ -110,6 +110,14 @@ See [Output Config](../common/docs/output.md) for more information. See [Filter Config](../common/docs/filter.md) for more information on the optional `filter` config. +#### Alternative ACCOUNT_USAGE Schema + +By default, the connector will read the [QUERY_HISTORY](https://docs.snowflake.com/en/sql-reference/account-usage/query_history), [ACCESS_HISTORY](https://docs.snowflake.com/en/sql-reference/account-usage/access_history), and [TAG_REFERENCES](https://docs.snowflake.com/en/sql-reference/account-usage/tag_references) views from [SNOWFLAKE.ACCOUNT_USAGE](https://docs.snowflake.com/en/sql-reference/account-usage) schema. If you do not wish to grant read access to the entire SNOWFLAKE database, you can mirror these views to a different schema and ask the connector to read from it instead: + +```yaml +account_usage_schema: . +``` + #### Tag Assignment See [Tag Matcher Config](../common/docs/tag_matcher.md) for more information on the optional `tag_matcher` config. diff --git a/metaphor/snowflake/config.py b/metaphor/snowflake/config.py index ca313d7d..f7efa0b5 100644 --- a/metaphor/snowflake/config.py +++ b/metaphor/snowflake/config.py @@ -70,3 +70,6 @@ class SnowflakeConfig(SnowflakeBaseConfig): streams: SnowflakeStreamsConfig = field( default_factory=lambda: SnowflakeStreamsConfig() ) + + # The fully qualified schema that contains all the account_usage views + account_usage_schema: str = "SNOWFLAKE.ACCOUNT_USAGE" diff --git a/metaphor/snowflake/extractor.py b/metaphor/snowflake/extractor.py index b9ccbde1..34d4469b 100644 --- a/metaphor/snowflake/extractor.py +++ b/metaphor/snowflake/extractor.py @@ -111,6 +111,7 @@ def __init__(self, config: SnowflakeConfig): self._account = normalize_snowflake_account(config.account) self._filter = config.filter.normalize().merge(DEFAULT_FILTER) self._tag_matchers = config.tag_matchers + self._account_usage_schema = config.account_usage_schema self._query_log_excluded_usernames = config.query_log.excluded_usernames self._query_log_lookback_days = config.query_log.lookback_days self._query_log_fetch_size = config.query_log.fetch_size @@ -506,9 +507,9 @@ def _add_hierarchy_system_tag( hierarchy.system_tags = SystemTags(tags=[system_tag]) def _fetch_tags(self, cursor: SnowflakeCursor) -> None: - query = """ + query = f""" SELECT TAG_NAME, TAG_VALUE, DOMAIN, OBJECT_DATABASE, OBJECT_SCHEMA, OBJECT_NAME, COLUMN_NAME - FROM snowflake.account_usage.tag_references + FROM {self._account_usage_schema}.TAG_REFERENCES WHERE DOMAIN in ('TABLE', 'COLUMN', 'DATABASE', 'SCHEMA') AND OBJECT_DELETED IS NULL ORDER BY case when DOMAIN = 'DATABASE' then 1 @@ -727,8 +728,8 @@ def _batch_query_for_access_logs( q.USER_NAME, QUERY_TEXT, START_TIME, TOTAL_ELAPSED_TIME, CREDITS_USED_CLOUD_SERVICES, DATABASE_NAME, SCHEMA_NAME, BYTES_SCANNED, BYTES_WRITTEN, ROWS_PRODUCED, ROWS_INSERTED, ROWS_UPDATED, DIRECT_OBJECTS_ACCESSED, OBJECTS_MODIFIED - FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY q - JOIN SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY a + FROM {self._account_usage_schema}.QUERY_HISTORY q + JOIN {self._account_usage_schema}.ACCESS_HISTORY a ON a.QUERY_ID = q.QUERY_ID WHERE EXECUTION_STATUS = 'SUCCESS' AND q.START_TIME > %s AND q.START_TIME <= %s @@ -758,7 +759,7 @@ def _batch_query_for_query_logs( SELECT QUERY_ID, QUERY_PARAMETERIZED_HASH, USER_NAME, QUERY_TEXT, START_TIME, TOTAL_ELAPSED_TIME, CREDITS_USED_CLOUD_SERVICES, DATABASE_NAME, SCHEMA_NAME, BYTES_SCANNED, BYTES_WRITTEN, ROWS_PRODUCED, ROWS_INSERTED, ROWS_UPDATED - FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY q + FROM {self._account_usage_schema}.QUERY_HISTORY q WHERE EXECUTION_STATUS = 'SUCCESS' AND START_TIME > %s AND START_TIME <= %s {exclude_username_clause(self._query_log_excluded_usernames)} diff --git a/pyproject.toml b/pyproject.toml index f447177b..b34cff6d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.64" +version = "0.14.65" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/snowflake/config.yml b/tests/snowflake/config.yml index 41957c4b..9b0c19c9 100644 --- a/tests/snowflake/config.yml +++ b/tests/snowflake/config.yml @@ -5,6 +5,7 @@ password: password role: role warehouse: warehouse default_database: database +account_usage_schema: db.schema filter: includes: db1: diff --git a/tests/snowflake/test_config.py b/tests/snowflake/test_config.py index cdc9686a..1d7c18c6 100644 --- a/tests/snowflake/test_config.py +++ b/tests/snowflake/test_config.py @@ -17,6 +17,7 @@ def test_yaml_config(test_root_dir): role="role", warehouse="warehouse", default_database="database", + account_usage_schema="db.schema", filter=DatasetFilter( includes={ "db1": {