Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use account_usage_schema config in all places #952

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions metaphor/snowflake/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class SnowflakeBaseConfig(SnowflakeAuthConfig):
# Max number of concurrent queries to database
max_concurrency: int = DEFAULT_THREAD_POOL_SIZE

# The fully qualified schema that contains all the account_usage views
account_usage_schema: str = "SNOWFLAKE.ACCOUNT_USAGE"


@dataclass(config=ConnectorConfig)
class SnowflakeConfig(SnowflakeBaseConfig):
Expand All @@ -70,6 +73,3 @@ class SnowflakeConfig(SnowflakeBaseConfig):
streams: SnowflakeStreamsConfig = field(
default_factory=lambda: SnowflakeStreamsConfig()
)

# The fully qualified schema that contains all the account_usage views
account_usage_schema: str = "SNOWFLAKE.ACCOUNT_USAGE"
5 changes: 4 additions & 1 deletion metaphor/snowflake/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,11 +581,14 @@ def _fetch_query_logs(self) -> Iterator[QueryLog]:
start_date = start_of_day(self._query_log_lookback_days)
end_date = start_of_day()

has_access_history = check_access_history(self._conn)
has_access_history = check_access_history(
self._conn, self._account_usage_schema
)
logger.info(f"Using Snowflake Enterprise edition: {has_access_history}")

count = fetch_query_history_count(
self._conn,
self._account_usage_schema,
start_date,
self._query_log_excluded_usernames,
end_date,
Expand Down
11 changes: 6 additions & 5 deletions metaphor/snowflake/lineage/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(self, config: SnowflakeLineageRunConfig):
self._enable_view_lineage = config.enable_view_lineage
self._enable_lineage_from_history = config.enable_lineage_from_history
self._include_self_lineage = config.include_self_lineage
self._account_usage_schema = config.account_usage_schema
self._config = config

self._datasets: Dict[str, Dataset] = {}
Expand All @@ -76,10 +77,10 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
logger.info("Fetching access and query history")
# Join QUERY_HISTORY & ACCESS_HISTORY to include only queries that succeeded.
cursor.execute(
"""
f"""
SELECT COUNT(*)
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY q
JOIN SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY a
FROM {self._account_usage_schema}.QUERY_HISTORY q
JOIN {self._account_usage_schema}.ACCESS_HISTORY a
ON q.QUERY_ID = a.QUERY_ID
WHERE q.EXECUTION_STATUS = 'SUCCESS'
AND ARRAY_SIZE(a.DIRECT_OBJECTS_ACCESSED) > 0
Expand All @@ -100,8 +101,8 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
str(x): QueryWithParam(
f"""
SELECT a.DIRECT_OBJECTS_ACCESSED, a.OBJECTS_MODIFIED, q.QUERY_TEXT
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY q
JOIN SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY a
FROM {self._account_usage_schema}.QUERY_HISTORY q
JOIN {self._account_usage_schema}.ACCESS_HISTORY a
ON q.QUERY_ID = a.QUERY_ID
WHERE q.EXECUTION_STATUS = 'SUCCESS'
AND ARRAY_SIZE(a.DIRECT_OBJECTS_ACCESSED) > 0
Expand Down
10 changes: 6 additions & 4 deletions metaphor/snowflake/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,16 @@ def exclude_username_clause(excluded_usernames: Set[str]) -> str:

def check_access_history(
conn: SnowflakeConnection,
account_usage_schema: str,
) -> bool:
"""
Check if access history table is available
"""
cursor = conn.cursor()
cursor.execute(
"""
f"""
SELECT QUERY_ID
FROM SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY
FROM {account_usage_schema}.ACCESS_HISTORY
LIMIT 1
"""
)
Expand All @@ -174,6 +175,7 @@ def check_access_history(

def fetch_query_history_count(
conn: SnowflakeConnection,
account_usage_schema: str,
start_date: datetime,
excluded_usernames: Set[str],
end_date: datetime = datetime.now(),
Expand All @@ -187,8 +189,8 @@ def fetch_query_history_count(
cursor.execute(
f"""
SELECT COUNT(1)
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY q
JOIN SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY a
FROM {account_usage_schema}.QUERY_HISTORY q
JOIN {account_usage_schema}.ACCESS_HISTORY a
ON a.QUERY_ID = q.QUERY_ID
WHERE EXECUTION_STATUS = 'SUCCESS'
and START_TIME > %s and START_TIME <= %s
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.77"
version = "0.14.78"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
Loading