From 5778f6a53395face54b149257d31d2ef966ff16d Mon Sep 17 00:00:00 2001 From: Tsung-Ju Lii Date: Tue, 7 May 2024 19:19:58 +0800 Subject: [PATCH 1/6] [sc-26213] Use bogus `VALUES` cluase for `INSERT INTO` expressions that are too long --- metaphor/snowflake/config.py | 5 +---- metaphor/snowflake/extractor.py | 6 ++++-- metaphor/snowflake/utils.py | 31 +++++++++++++++++++++++++++++++ tests/snowflake/test_utils.py | 14 +++++++++++++- 4 files changed, 49 insertions(+), 7 deletions(-) diff --git a/metaphor/snowflake/config.py b/metaphor/snowflake/config.py index e14fde14..db36deb2 100644 --- a/metaphor/snowflake/config.py +++ b/metaphor/snowflake/config.py @@ -7,14 +7,11 @@ from metaphor.common.filter import DatasetFilter from metaphor.common.tag_matcher import TagMatcher from metaphor.snowflake.auth import SnowflakeAuthConfig -from metaphor.snowflake.utils import DEFAULT_THREAD_POOL_SIZE +from metaphor.snowflake.utils import DEFAULT_MAX_QUERY_SIZE, DEFAULT_THREAD_POOL_SIZE # number of query logs to fetch from Snowflake in one batch DEFAULT_QUERY_LOG_FETCH_SIZE = 100000 -# By default ignore queries larger than 512KiB -DEFAULT_MAX_QUERY_SIZE = 512 * 1024 - @dataclass(config=ConnectorConfig) class SnowflakeQueryLogConfig: diff --git a/metaphor/snowflake/extractor.py b/metaphor/snowflake/extractor.py index 8bd1e6e3..292f946d 100644 --- a/metaphor/snowflake/extractor.py +++ b/metaphor/snowflake/extractor.py @@ -74,6 +74,7 @@ str_to_stream_type, table_type_to_materialization_type, to_quoted_identifier, + truncate_query_text, ) logger = get_logger() @@ -804,7 +805,8 @@ def _parse_query_logs( ) # Skip large queries - if len(query_text) >= self._query_log_max_query_size: + sql_length = len(query_text) + if sql_length > self._query_log_max_query_size: continue # User IDs can be an email address @@ -828,7 +830,7 @@ def _parse_query_logs( bytes_written=safe_float(bytes_written), sources=sources, targets=targets, - sql=query_text, + sql=truncate_query_text(query_text), sql_hash=query_hash, ) diff --git a/metaphor/snowflake/utils.py b/metaphor/snowflake/utils.py index 1c9e3045..a4f851de 100644 --- a/metaphor/snowflake/utils.py +++ b/metaphor/snowflake/utils.py @@ -1,4 +1,5 @@ import logging +import re import time from concurrent import futures from dataclasses import dataclass @@ -24,6 +25,11 @@ DEFAULT_THREAD_POOL_SIZE = 10 DEFAULT_SLEEP_TIME = 0.1 # 0.1 s +DEFAULT_MAX_QUERY_SIZE = 1000000 +""" +By default ignore queries longer than 100K characters +""" + class SnowflakeTableType(Enum): """ @@ -270,3 +276,28 @@ def is_target_field(field: SchemaField) -> bool: field = next((f for f in fields if is_target_field(f)), None) if field: _update_field_system_tag(field, system_tag) + + +def truncate_query_text( + sql: str, max_query_length: int = DEFAULT_MAX_QUERY_SIZE +) -> str: + """ + If we encounter an `INSERT INTO` query that's longer than Snowflake's allowed query length in + `QUERY_HISTORY` view, we drop the actual values and add a single all-null row into the table + definition expression. E.g. the truncated query becomes + ```sql + INSERT INTO table (col1, col2, ...) VALUES (NULL, NULL, ...) + ``` + """ + if len(sql) < max_query_length: + return sql + + pattern = r"insert into [^\(]+ \((?P[^\)]+)\) values" + matched = next((re.finditer(pattern, sql, re.IGNORECASE)), None) + if matched is None or matched.span()[0] != 0: + return sql + + column_count = len(matched["columns"].split(",")) + return ( + f"{sql[:matched.span()[-1]]} ({', '.join('NULL' for _ in range(column_count))})" + ) diff --git a/tests/snowflake/test_utils.py b/tests/snowflake/test_utils.py index 3bd141ad..a6a442a7 100644 --- a/tests/snowflake/test_utils.py +++ b/tests/snowflake/test_utils.py @@ -1,7 +1,19 @@ -from metaphor.snowflake.utils import to_quoted_identifier +from metaphor.snowflake.utils import to_quoted_identifier, truncate_query_text def test_to_quoted_identifier(): assert to_quoted_identifier([None, "", "a", "b", "c"]) == '"a"."b"."c"' assert to_quoted_identifier(["db", "sc", 'ta"@BLE']) == '"db"."sc"."ta""@BLE"' + + +def test_truncate_query_text(): + truncated = truncate_query_text( + 'INSERT INTO tb (col1, col2, col3, col4) VALUES ("Hey, I just met you,", "And this is crazy,", "But here\'s my number,", "So call me, maybe?")', # Some joke I saw some time ago: https://stackoverflow.com/questions/2139812/what-is-a-callback + max_query_length=50, + ) + + assert ( + truncated + == "INSERT INTO tb (col1, col2, col3, col4) VALUES (NULL, NULL, NULL, NULL)" + ) From 6ed8135575bac5b06caf74c10d420250a8a39ecc Mon Sep 17 00:00:00 2001 From: Tsung-Ju Lii Date: Tue, 7 May 2024 19:43:37 +0800 Subject: [PATCH 2/6] run test against sf instance, and bump version --- metaphor/snowflake/utils.py | 14 +++++--------- pyproject.toml | 2 +- tests/snowflake/test_utils.py | 3 +-- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/metaphor/snowflake/utils.py b/metaphor/snowflake/utils.py index a4f851de..f2a8fe7d 100644 --- a/metaphor/snowflake/utils.py +++ b/metaphor/snowflake/utils.py @@ -278,23 +278,19 @@ def is_target_field(field: SchemaField) -> bool: _update_field_system_tag(field, system_tag) -def truncate_query_text( - sql: str, max_query_length: int = DEFAULT_MAX_QUERY_SIZE -) -> str: +def truncate_query_text(sql: str) -> str: """ - If we encounter an `INSERT INTO` query that's longer than Snowflake's allowed query length in - `QUERY_HISTORY` view, we drop the actual values and add a single all-null row into the table - definition expression. E.g. the truncated query becomes + If we encounter an `INSERT INTO` query that has been truncated when we fetch it from + `QUERY_HISTORY` view, we drop the actual values and add a single all-null row into + the table definition expression. E.g. the truncated query becomes ```sql INSERT INTO table (col1, col2, ...) VALUES (NULL, NULL, ...) ``` """ - if len(sql) < max_query_length: - return sql pattern = r"insert into [^\(]+ \((?P[^\)]+)\) values" matched = next((re.finditer(pattern, sql, re.IGNORECASE)), None) - if matched is None or matched.span()[0] != 0: + if matched is None or matched.span()[0] != 0 or sql[-1] == ")": return sql column_count = len(matched["columns"].split(",")) diff --git a/pyproject.toml b/pyproject.toml index bfaa6639..8075f7a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.13.187" +version = "0.13.188" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/snowflake/test_utils.py b/tests/snowflake/test_utils.py index a6a442a7..4255be98 100644 --- a/tests/snowflake/test_utils.py +++ b/tests/snowflake/test_utils.py @@ -9,8 +9,7 @@ def test_to_quoted_identifier(): def test_truncate_query_text(): truncated = truncate_query_text( - 'INSERT INTO tb (col1, col2, col3, col4) VALUES ("Hey, I just met you,", "And this is crazy,", "But here\'s my number,", "So call me, maybe?")', # Some joke I saw some time ago: https://stackoverflow.com/questions/2139812/what-is-a-callback - max_query_length=50, + 'INSERT INTO tb (col1, col2, col3, col4) VALUES ("Hey, I just met you,", "And this is crazy,", "But here\'s my number,", "So call me, maybe', # Some joke I saw some time ago: https://stackoverflow.com/questions/2139812/what-is-a-callback ) assert ( From 563bde098807fbdb0f19ff6ee600190a0d080a5b Mon Sep 17 00:00:00 2001 From: Tsung-Ju Lii Date: Wed, 8 May 2024 00:29:03 +0800 Subject: [PATCH 3/6] address comments --- metaphor/snowflake/config.py | 5 ++++- metaphor/snowflake/extractor.py | 7 +++---- metaphor/snowflake/utils.py | 13 ++++++------- tests/snowflake/test_utils.py | 6 +++--- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/metaphor/snowflake/config.py b/metaphor/snowflake/config.py index db36deb2..e14fde14 100644 --- a/metaphor/snowflake/config.py +++ b/metaphor/snowflake/config.py @@ -7,11 +7,14 @@ from metaphor.common.filter import DatasetFilter from metaphor.common.tag_matcher import TagMatcher from metaphor.snowflake.auth import SnowflakeAuthConfig -from metaphor.snowflake.utils import DEFAULT_MAX_QUERY_SIZE, DEFAULT_THREAD_POOL_SIZE +from metaphor.snowflake.utils import DEFAULT_THREAD_POOL_SIZE # number of query logs to fetch from Snowflake in one batch DEFAULT_QUERY_LOG_FETCH_SIZE = 100000 +# By default ignore queries larger than 512KiB +DEFAULT_MAX_QUERY_SIZE = 512 * 1024 + @dataclass(config=ConnectorConfig) class SnowflakeQueryLogConfig: diff --git a/metaphor/snowflake/extractor.py b/metaphor/snowflake/extractor.py index 292f946d..03c1a8b4 100644 --- a/metaphor/snowflake/extractor.py +++ b/metaphor/snowflake/extractor.py @@ -70,11 +70,11 @@ check_access_history, exclude_username_clause, fetch_query_history_count, + fix_truncated_query_text, str_to_source_type, str_to_stream_type, table_type_to_materialization_type, to_quoted_identifier, - truncate_query_text, ) logger = get_logger() @@ -805,8 +805,7 @@ def _parse_query_logs( ) # Skip large queries - sql_length = len(query_text) - if sql_length > self._query_log_max_query_size: + if len(query_text) > self._query_log_max_query_size: continue # User IDs can be an email address @@ -830,7 +829,7 @@ def _parse_query_logs( bytes_written=safe_float(bytes_written), sources=sources, targets=targets, - sql=truncate_query_text(query_text), + sql=fix_truncated_query_text(query_text), sql_hash=query_hash, ) diff --git a/metaphor/snowflake/utils.py b/metaphor/snowflake/utils.py index f2a8fe7d..3296b913 100644 --- a/metaphor/snowflake/utils.py +++ b/metaphor/snowflake/utils.py @@ -25,11 +25,6 @@ DEFAULT_THREAD_POOL_SIZE = 10 DEFAULT_SLEEP_TIME = 0.1 # 0.1 s -DEFAULT_MAX_QUERY_SIZE = 1000000 -""" -By default ignore queries longer than 100K characters -""" - class SnowflakeTableType(Enum): """ @@ -278,11 +273,15 @@ def is_target_field(field: SchemaField) -> bool: _update_field_system_tag(field, system_tag) -def truncate_query_text(sql: str) -> str: +def fix_truncated_query_text(sql: str) -> str: """ If we encounter an `INSERT INTO` query that has been truncated when we fetch it from `QUERY_HISTORY` view, we drop the actual values and add a single all-null row into - the table definition expression. E.g. the truncated query becomes + the table definition expression. E.g. for a truncated, unterminated query + ```sql + INSERT INTO table (col1, col2, ...) VALUE (v1, v2, ...), ..., (vn1, vn2 + ``` + our logic turns it into ```sql INSERT INTO table (col1, col2, ...) VALUES (NULL, NULL, ...) ``` diff --git a/tests/snowflake/test_utils.py b/tests/snowflake/test_utils.py index 4255be98..0159b62e 100644 --- a/tests/snowflake/test_utils.py +++ b/tests/snowflake/test_utils.py @@ -1,4 +1,4 @@ -from metaphor.snowflake.utils import to_quoted_identifier, truncate_query_text +from metaphor.snowflake.utils import fix_truncated_query_text, to_quoted_identifier def test_to_quoted_identifier(): @@ -7,8 +7,8 @@ def test_to_quoted_identifier(): assert to_quoted_identifier(["db", "sc", 'ta"@BLE']) == '"db"."sc"."ta""@BLE"' -def test_truncate_query_text(): - truncated = truncate_query_text( +def test_fix_truncated_query_text(): + truncated = fix_truncated_query_text( 'INSERT INTO tb (col1, col2, col3, col4) VALUES ("Hey, I just met you,", "And this is crazy,", "But here\'s my number,", "So call me, maybe', # Some joke I saw some time ago: https://stackoverflow.com/questions/2139812/what-is-a-callback ) From c7cc61fa5e2070e60eb0cf37b37857f348ce492f Mon Sep 17 00:00:00 2001 From: Tsung-Ju Lii Date: Wed, 8 May 2024 01:16:51 +0800 Subject: [PATCH 4/6] lower max query length to filter out truncated queries --- metaphor/snowflake/config.py | 4 ++-- metaphor/snowflake/extractor.py | 3 +-- metaphor/snowflake/utils.py | 26 -------------------------- tests/snowflake/test_utils.py | 13 +------------ 4 files changed, 4 insertions(+), 42 deletions(-) diff --git a/metaphor/snowflake/config.py b/metaphor/snowflake/config.py index e14fde14..a6018187 100644 --- a/metaphor/snowflake/config.py +++ b/metaphor/snowflake/config.py @@ -12,8 +12,8 @@ # number of query logs to fetch from Snowflake in one batch DEFAULT_QUERY_LOG_FETCH_SIZE = 100000 -# By default ignore queries larger than 512KiB -DEFAULT_MAX_QUERY_SIZE = 512 * 1024 +# By default ignore queries longer than 100K characters +DEFAULT_MAX_QUERY_SIZE = 100_000 @dataclass(config=ConnectorConfig) diff --git a/metaphor/snowflake/extractor.py b/metaphor/snowflake/extractor.py index 03c1a8b4..8101d93e 100644 --- a/metaphor/snowflake/extractor.py +++ b/metaphor/snowflake/extractor.py @@ -70,7 +70,6 @@ check_access_history, exclude_username_clause, fetch_query_history_count, - fix_truncated_query_text, str_to_source_type, str_to_stream_type, table_type_to_materialization_type, @@ -829,7 +828,7 @@ def _parse_query_logs( bytes_written=safe_float(bytes_written), sources=sources, targets=targets, - sql=fix_truncated_query_text(query_text), + sql=query_text, sql_hash=query_hash, ) diff --git a/metaphor/snowflake/utils.py b/metaphor/snowflake/utils.py index 3296b913..1c9e3045 100644 --- a/metaphor/snowflake/utils.py +++ b/metaphor/snowflake/utils.py @@ -1,5 +1,4 @@ import logging -import re import time from concurrent import futures from dataclasses import dataclass @@ -271,28 +270,3 @@ def is_target_field(field: SchemaField) -> bool: field = next((f for f in fields if is_target_field(f)), None) if field: _update_field_system_tag(field, system_tag) - - -def fix_truncated_query_text(sql: str) -> str: - """ - If we encounter an `INSERT INTO` query that has been truncated when we fetch it from - `QUERY_HISTORY` view, we drop the actual values and add a single all-null row into - the table definition expression. E.g. for a truncated, unterminated query - ```sql - INSERT INTO table (col1, col2, ...) VALUE (v1, v2, ...), ..., (vn1, vn2 - ``` - our logic turns it into - ```sql - INSERT INTO table (col1, col2, ...) VALUES (NULL, NULL, ...) - ``` - """ - - pattern = r"insert into [^\(]+ \((?P[^\)]+)\) values" - matched = next((re.finditer(pattern, sql, re.IGNORECASE)), None) - if matched is None or matched.span()[0] != 0 or sql[-1] == ")": - return sql - - column_count = len(matched["columns"].split(",")) - return ( - f"{sql[:matched.span()[-1]]} ({', '.join('NULL' for _ in range(column_count))})" - ) diff --git a/tests/snowflake/test_utils.py b/tests/snowflake/test_utils.py index 0159b62e..3bd141ad 100644 --- a/tests/snowflake/test_utils.py +++ b/tests/snowflake/test_utils.py @@ -1,18 +1,7 @@ -from metaphor.snowflake.utils import fix_truncated_query_text, to_quoted_identifier +from metaphor.snowflake.utils import to_quoted_identifier def test_to_quoted_identifier(): assert to_quoted_identifier([None, "", "a", "b", "c"]) == '"a"."b"."c"' assert to_quoted_identifier(["db", "sc", 'ta"@BLE']) == '"db"."sc"."ta""@BLE"' - - -def test_fix_truncated_query_text(): - truncated = fix_truncated_query_text( - 'INSERT INTO tb (col1, col2, col3, col4) VALUES ("Hey, I just met you,", "And this is crazy,", "But here\'s my number,", "So call me, maybe', # Some joke I saw some time ago: https://stackoverflow.com/questions/2139812/what-is-a-callback - ) - - assert ( - truncated - == "INSERT INTO tb (col1, col2, col3, col4) VALUES (NULL, NULL, NULL, NULL)" - ) From 443eb909147bcc61973710aa5915ea1aa20cdddb Mon Sep 17 00:00:00 2001 From: Tsung-Ju Lii Date: Wed, 8 May 2024 16:07:07 +0800 Subject: [PATCH 5/6] address comments --- metaphor/snowflake/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/metaphor/snowflake/config.py b/metaphor/snowflake/config.py index a6018187..57bf15b9 100644 --- a/metaphor/snowflake/config.py +++ b/metaphor/snowflake/config.py @@ -13,6 +13,7 @@ DEFAULT_QUERY_LOG_FETCH_SIZE = 100000 # By default ignore queries longer than 100K characters +# Reference: https://docs.snowflake.com/en/sql-reference/account-usage/query_history DEFAULT_MAX_QUERY_SIZE = 100_000 From 0a7957f57832962c8b814dbab96775a7fa634c87 Mon Sep 17 00:00:00 2001 From: Tsung-Ju Lii Date: Wed, 8 May 2024 22:48:49 +0800 Subject: [PATCH 6/6] bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 987a7fff..551307f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.1" +version = "0.14.2" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "]