From 08861d0e868333cf47c55e928940b18fedd86f02 Mon Sep 17 00:00:00 2001 From: Scott Huang Date: Tue, 3 Sep 2024 23:26:11 +0800 Subject: [PATCH 1/3] Collect all valid query log except commands --- metaphor/common/sql/process_query/config.py | 15 +++++-- .../common/sql/process_query/process_query.py | 5 ++- metaphor/postgresql/extractor.py | 4 -- tests/common/sql/process_query/test_config.py | 6 ++- .../sql/process_query/test_process_query.py | 19 ++++---- tests/postgresql/test_extractor.py | 44 ++++++++++++++++++- 6 files changed, 75 insertions(+), 18 deletions(-) diff --git a/metaphor/common/sql/process_query/config.py b/metaphor/common/sql/process_query/config.py index 0b81fba4..d31ffd37 100644 --- a/metaphor/common/sql/process_query/config.py +++ b/metaphor/common/sql/process_query/config.py @@ -54,20 +54,29 @@ class ProcessQueryConfig: default_factory=lambda: RedactPIILiteralsConfig() ) - ignore_insert_values_into: bool = False """ Ignore `INSERT INTO ... VALUES` expressions. These expressions don't have any lineage information, and are often very large in size. """ + ignore_insert_values_into: bool = False - skip_unparsable_queries: bool = False """ If this is set to `True`, when Sqlglot fails to parse a query we skip it from the collected MCE. """ + skip_unparsable_queries: bool = False + + """ + Skip commands that interact with databases, such as: create user + """ + ignore_command_statement: bool = True @property def should_process(self) -> bool: """ Whether we should run the processing method at all. """ - return self.redact_literals.enabled or self.ignore_insert_values_into + return ( + self.redact_literals.enabled + or self.ignore_insert_values_into + or self.ignore_command_statement + ) diff --git a/metaphor/common/sql/process_query/process_query.py b/metaphor/common/sql/process_query/process_query.py index 0284df59..d6839123 100644 --- a/metaphor/common/sql/process_query/process_query.py +++ b/metaphor/common/sql/process_query/process_query.py @@ -75,8 +75,11 @@ def process_query( if config.ignore_insert_values_into and _is_insert_values_into(expression): return None + if config.ignore_command_statement and isinstance(expression, exp.Command): + return None + if not config.redact_literals.enabled: - return expression.sql(dialect=dialect) + return updated DialectClass: t.Type[Dialect] if dialect is None: diff --git a/metaphor/postgresql/extractor.py b/metaphor/postgresql/extractor.py index b2c03c34..b2babe3d 100644 --- a/metaphor/postgresql/extractor.py +++ b/metaphor/postgresql/extractor.py @@ -479,10 +479,6 @@ def _process_cloud_watch_log( default_database=parsed.database, ) - # Skip SQL statement that is not related to any table - if not tll.sources and not tll.targets: - return None - # Skip if parsed sources or targets has invalid data. if not is_valid_queried_datasets(tll.sources) or not is_valid_queried_datasets( tll.targets diff --git a/tests/common/sql/process_query/test_config.py b/tests/common/sql/process_query/test_config.py index 9aec4eb6..5986f8cb 100644 --- a/tests/common/sql/process_query/test_config.py +++ b/tests/common/sql/process_query/test_config.py @@ -10,9 +10,13 @@ def test_config(): assert config.should_process config = ProcessQueryConfig() + assert config.should_process + + config = ProcessQueryConfig(ignore_command_statement=False) assert not config.should_process config = ProcessQueryConfig( - redact_literals=RedactPIILiteralsConfig(where_clauses=True) + redact_literals=RedactPIILiteralsConfig(where_clauses=True), + ignore_command_statement=False, ) assert not config.should_process diff --git a/tests/common/sql/process_query/test_process_query.py b/tests/common/sql/process_query/test_process_query.py index ba1181c9..679c98b0 100644 --- a/tests/common/sql/process_query/test_process_query.py +++ b/tests/common/sql/process_query/test_process_query.py @@ -18,19 +18,22 @@ ignore_insert_values_into=True, ) +pre_preprocess_only = ProcessQueryConfig() + @pytest.mark.parametrize( - ["name", "platform"], + ["name", "platform", "config"], [ - ("redact_literal_values_in_where_clauses", DataPlatform.SNOWFLAKE), - ("redact_simple", DataPlatform.SNOWFLAKE), - ("snowflake_copy_into", DataPlatform.SNOWFLAKE), - ("redact_literals_in_where_in", DataPlatform.MSSQL), - ("redact_literals_in_where_or", DataPlatform.MSSQL), - ("redact_merge_insert_when_not_matched", DataPlatform.MSSQL), + ("redact_literal_values_in_where_clauses", DataPlatform.SNOWFLAKE, config), + ("redact_simple", DataPlatform.SNOWFLAKE, config), + ("snowflake_copy_into", DataPlatform.SNOWFLAKE, config), + ("snowflake_copy_into", DataPlatform.SNOWFLAKE, pre_preprocess_only), + ("redact_literals_in_where_in", DataPlatform.MSSQL, config), + ("redact_literals_in_where_or", DataPlatform.MSSQL, config), + ("redact_merge_insert_when_not_matched", DataPlatform.MSSQL, config), ], ) -def test_process_query(name: str, platform: DataPlatform): +def test_process_query(name: str, platform: DataPlatform, config: ProcessQueryConfig): dir = Path(__file__).parent / "process_query" / name with open(dir / "query.sql") as f: sql = f.read() diff --git a/tests/postgresql/test_extractor.py b/tests/postgresql/test_extractor.py index 9f2c345e..8012f199 100644 --- a/tests/postgresql/test_extractor.py +++ b/tests/postgresql/test_extractor.py @@ -132,7 +132,7 @@ def test_process_cloud_watch_log(): == gold ) - # SQL don't have source and targets + # Skip commands assert ( extractor._process_cloud_watch_log( "2024-08-29 09:25:50 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: statement: CREATE USER abc PASSWORD 'asjdkasjd';", @@ -243,3 +243,45 @@ async def test_extractor(mocked_fetch_databases): extractor = PostgreSQLExtractor(dummy_config()) events = [e for e in await extractor.extract()] assert events == [] + + +def test_alter_rename(): + extractor = PostgreSQLExtractor(dummy_config()) + + cache = {} + + # Valid Statement + assert ( + extractor._process_cloud_watch_log( + "2024-08-29 09:25:50 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: statement: ALTER TABLE schema.table_tmp RENAME TO table;", + cache, + ) + is None + ) + assert extractor._process_cloud_watch_log( + "2024-08-29 09:25:51 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: duration: 55.66 ms", + cache, + ) == QueryLog( + id="POSTGRESQL:e6c7e28a652b40dc29c1a5ed5c1b1f16", + account=None, + bytes_read=None, + bytes_written=None, + cost=None, + default_database="metaphor", + default_schema=None, + duration=55.66, + email=None, + metadata=None, + parsing=None, + platform=DataPlatform.POSTGRESQL, + query_id="e6c7e28a652b40dc29c1a5ed5c1b1f16", + rows_read=None, + rows_written=None, + sources=[], + sql="ALTER TABLE schema.table_tmp RENAME TO table;", + sql_hash="e6c7e28a652b40dc29c1a5ed5c1b1f16", + start_time=datetime(2024, 8, 29, 9, 25, 50, tzinfo=timezone.utc), + targets=[], + type=None, + user_id="metaphor", + ) From 1a06b24b443446562e0ebad2d0edfd779850ba24 Mon Sep 17 00:00:00 2001 From: Scott Huang Date: Tue, 3 Sep 2024 23:26:39 +0800 Subject: [PATCH 2/3] Bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 72048122..5f92f09e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.98" +version = "0.14.99" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] From 553a12b069fe4ef0d53643cd61010f745aec3ae8 Mon Sep 17 00:00:00 2001 From: Scott Huang Date: Tue, 3 Sep 2024 23:42:40 +0800 Subject: [PATCH 3/3] Address comments --- metaphor/common/sql/process_query/config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metaphor/common/sql/process_query/config.py b/metaphor/common/sql/process_query/config.py index d31ffd37..7166d9a9 100644 --- a/metaphor/common/sql/process_query/config.py +++ b/metaphor/common/sql/process_query/config.py @@ -54,21 +54,21 @@ class ProcessQueryConfig: default_factory=lambda: RedactPIILiteralsConfig() ) + ignore_insert_values_into: bool = False """ Ignore `INSERT INTO ... VALUES` expressions. These expressions don't have any lineage information, and are often very large in size. """ - ignore_insert_values_into: bool = False + skip_unparsable_queries: bool = False """ If this is set to `True`, when Sqlglot fails to parse a query we skip it from the collected MCE. """ - skip_unparsable_queries: bool = False + ignore_command_statement: bool = True """ Skip commands that interact with databases, such as: create user """ - ignore_command_statement: bool = True @property def should_process(self) -> bool: