diff --git a/metaphor/common/sql/process_query/process_query.py b/metaphor/common/sql/process_query/process_query.py index 25308a4f..46a3a924 100644 --- a/metaphor/common/sql/process_query/process_query.py +++ b/metaphor/common/sql/process_query/process_query.py @@ -82,6 +82,7 @@ def process_query( isinstance(expression, exp.Show) or isinstance(expression, exp.Set) or isinstance(expression, exp.Rollback) + or isinstance(expression, exp.Commit) ): return None diff --git a/metaphor/postgresql/config.py b/metaphor/postgresql/config.py index 23c532b6..6e584222 100644 --- a/metaphor/postgresql/config.py +++ b/metaphor/postgresql/config.py @@ -45,6 +45,7 @@ class BasePostgreSQLRunConfig(BaseConfig): @dataclass(config=ConnectorConfig) class PostgreSQLQueryLogConfig(QueryLogConfig): aws: Optional[AwsCredentials] = None + log_duration_enabled: bool = False logs_group: Optional[str] = None diff --git a/metaphor/postgresql/extractor.py b/metaphor/postgresql/extractor.py index c011320b..b26fbe2f 100644 --- a/metaphor/postgresql/extractor.py +++ b/metaphor/postgresql/extractor.py @@ -418,12 +418,19 @@ def collect_query_logs(self) -> Iterator[QueryLog]: for message in iterate_logs_from_cloud_watch( client, lookback_days, logs_group ): - query_log = self._process_cloud_watch_log(message, previous_line_cache) + query_log = self._process_cloud_watch_log( + message, + previous_line_cache, + self._query_log_config.log_duration_enabled, + ) if query_log: yield query_log def _process_cloud_watch_log( - self, log: str, previous_line_cache: Dict[str, ParsedLog] + self, + log: str, + previous_line_cache: Dict[str, ParsedLog], + log_duration_enabled=True, ) -> Optional[QueryLog]: """ SQL statement and duration are in two consecutive record, example: @@ -461,9 +468,11 @@ def _process_cloud_watch_log( previous_line = previous_line_cache.get(parsed.session) previous_line_cache[parsed.session] = parsed + if not log_duration_enabled: + previous_line = parsed # The second line must be: duration: ms # Skip log that don't have previous line or invalid log - if ( + elif ( not parsed.log_body[0].lstrip().startswith("duration") or not previous_line or len(previous_line.log_body) < 2 @@ -473,7 +482,7 @@ def _process_cloud_watch_log( message_type = previous_line.log_body[0].lstrip() # Only `statement` (simple query), and `execute` (extended query) we should care about - if not message_type.startswith("statement") or message_type.startswith( + if not message_type.startswith("statement") and not message_type.startswith( "execute" ): return None @@ -484,7 +493,9 @@ def _process_cloud_watch_log( query = ":".join(previous_line.log_body[1:]).lstrip() # Extract duration from the current line - duration = self._extract_duration(parsed.log_body[1]) + duration = ( + self._extract_duration(parsed.log_body[1]) if log_duration_enabled else None + ) tll = extract_table_level_lineage( sql=query, diff --git a/tests/postgresql/expected_logs_execute.json b/tests/postgresql/expected_logs_execute.json new file mode 100644 index 00000000..df6c0b15 --- /dev/null +++ b/tests/postgresql/expected_logs_execute.json @@ -0,0 +1,40 @@ +[ + { + "_id": "POSTGRESQL:9f07fea1cd4a1223a3695b4865f40d82", + "defaultDatabase": "metaphor", + "platform": "POSTGRESQL", + "queryId": "9f07fea1cd4a1223a3695b4865f40d82", + "sources": [ + { + "database": "metaphor", + "id": "DATASET~F68D8D6F1F49DA4605F13F20FD3CA883", + "schema": "schema", + "table": "table" + } + ], + "sql": "SELECT * FROM schema.table", + "sqlHash": "9f07fea1cd4a1223a3695b4865f40d82", + "startTime": "2024-08-29T09:25:50+00:00", + "targets": [], + "userId": "metaphor" + }, + { + "_id": "POSTGRESQL:06903bf0b96b1021e32edad5cf2b15f1", + "defaultDatabase": "metaphor", + "platform": "POSTGRESQL", + "queryId": "06903bf0b96b1021e32edad5cf2b15f1", + "sources": [ + { + "database": "metaphor", + "id": "DATASET~1149097F32B03CEE64BB356B77701736", + "schema": "schema2", + "table": "table" + } + ], + "sql": "SELECT * FROM schema2.table", + "sqlHash": "06903bf0b96b1021e32edad5cf2b15f1", + "startTime": "2024-08-29T09:25:50+00:00", + "targets": [], + "userId": "metaphor" + } +] diff --git a/tests/postgresql/test_extractor.py b/tests/postgresql/test_extractor.py index bbec7301..8547ccb1 100644 --- a/tests/postgresql/test_extractor.py +++ b/tests/postgresql/test_extractor.py @@ -69,6 +69,7 @@ def dummy_config(**args): lookback_days=1, logs_group="group", excluded_usernames={"foo"}, + log_duration_enabled=True, aws=AwsCredentials( access_key_id="", secret_access_key="", @@ -220,9 +221,13 @@ async def test_extractor( mock_session.client.return_value = 1 mocked_get_session.return_value = mock_session - def mock_iterate_logs(a, b, c): - yield "2024-08-29 09:25:50 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: statement: SELECT x, y from schema.table;" - yield "2024-08-29 09:25:51 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: duration: 55.66 ms" + logs = [ + "2024-08-29 09:25:50 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: statement: SELECT x, y from schema.table;", + "2024-08-29 09:25:51 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: duration: 55.66 ms", + ] + + def mock_iterate_logs(_1, _2, _3): + yield from logs mocked_iterate_logs.side_effect = mock_iterate_logs @@ -274,3 +279,43 @@ def test_alter_rename(): type=None, user_id="metaphor", ) + + +@patch("metaphor.common.aws.AwsCredentials.get_session") +@patch("metaphor.postgresql.extractor.iterate_logs_from_cloud_watch") +def test_collect_query_logs( + mocked_iterate_logs: MagicMock, + mocked_get_session: MagicMock, + test_root_dir: str, +): + mock_session = MagicMock() + mock_session.client.return_value = 1 + mocked_get_session.return_value = mock_session + + date = "2024-08-29 09:25:50 UTC" + host = "10.1.1.134(48507)" + user_db = "metaphor@metaphor" + + logs = [ + f"{date}:{host}:{user_db}:[615]:LOG: execute : BEGIN", + f"{date}:{host}:{user_db}:[615]:LOG: execute : SELECT * FROM schema.table", + f"{date}:{host}:{user_db}:[615]:LOG: execute S_1: COMMIT", + f"{date}:{host}:{user_db}:[616]:LOG: execute : BEGIN", + f"{date}:{host}:{user_db}:[616]:LOG: execute : SELECT * FROM schema2.table", + f"{date}:{host}:{user_db}:[616]:LOG: execute S_1: COMMIT", + ] + + def mock_iterate_logs(_1, _2, _3): + yield from logs + + mocked_iterate_logs.side_effect = mock_iterate_logs + + config = dummy_config() + assert config.query_log + config.query_log.log_duration_enabled = False + + extractor = PostgreSQLExtractor(config) + log_events = [EventUtil.trim_event(e) for e in extractor.collect_query_logs()] + assert log_events == load_json( + f"{test_root_dir}/postgresql/expected_logs_execute.json" + )