diff --git a/metaphor/common/aws.py b/metaphor/common/aws.py index def6975c..59c32511 100644 --- a/metaphor/common/aws.py +++ b/metaphor/common/aws.py @@ -58,7 +58,10 @@ def get_session(self): def iterate_logs_from_cloud_watch( - client: client, lookback_days: int, logs_group: str + client: client, + lookback_days: int, + logs_group: str, + filter_pattern: Optional[str] = None, ) -> Iterator[str]: logger.info(f"Collecting query log from cloud watch for {lookback_days} days") @@ -79,6 +82,8 @@ def iterate_logs_from_cloud_watch( } if next_token: params["nextToken"] = next_token + if filter_pattern: + params["filterPattern"] = filter_pattern response = client.filter_log_events(**params) next_token = response["nextToken"] if "nextToken" in response else None diff --git a/metaphor/common/sql/process_query/process_query.py b/metaphor/common/sql/process_query/process_query.py index 25308a4f..4ac9126e 100644 --- a/metaphor/common/sql/process_query/process_query.py +++ b/metaphor/common/sql/process_query/process_query.py @@ -22,6 +22,15 @@ def _is_insert_values_into(expression: Expression) -> bool: ) +ALLOW_EXPRESSION_TYPES = ( + exp.Alter, + exp.DDL, + exp.DML, + exp.Merge, + exp.Query, +) + + def process_query( sql: str, data_platform: DataPlatform, @@ -78,11 +87,7 @@ def process_query( if config.ignore_command_statement and isinstance(expression, exp.Command): return None - if ( - isinstance(expression, exp.Show) - or isinstance(expression, exp.Set) - or isinstance(expression, exp.Rollback) - ): + if not isinstance(expression, ALLOW_EXPRESSION_TYPES): return None if not config.redact_literals.enabled: diff --git a/metaphor/postgresql/README.md b/metaphor/postgresql/README.md index a1b4e5df..3ad2af02 100644 --- a/metaphor/postgresql/README.md +++ b/metaphor/postgresql/README.md @@ -56,6 +56,12 @@ query_log: # (Optional) Number of days of query logs to fetch. Default to 1. If 0, the no query logs will be fetched. lookback_days: + + # (Optional) Extract query log duration if postgres parameter `log_duration` is true + log_duration_enabled: + + # (Optional) CloudWatch log filter pattern, https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html + filter_pattern: # For example: -root LOG # (Optional) A list of users whose queries will be excluded from the log fetching. excluded_usernames: diff --git a/metaphor/postgresql/config.py b/metaphor/postgresql/config.py index 23c532b6..26a66d3f 100644 --- a/metaphor/postgresql/config.py +++ b/metaphor/postgresql/config.py @@ -45,6 +45,8 @@ class BasePostgreSQLRunConfig(BaseConfig): @dataclass(config=ConnectorConfig) class PostgreSQLQueryLogConfig(QueryLogConfig): aws: Optional[AwsCredentials] = None + filter_pattern: Optional[str] = None + log_duration_enabled: bool = False logs_group: Optional[str] = None diff --git a/metaphor/postgresql/extractor.py b/metaphor/postgresql/extractor.py index c011320b..cbac3316 100644 --- a/metaphor/postgresql/extractor.py +++ b/metaphor/postgresql/extractor.py @@ -412,18 +412,26 @@ def collect_query_logs(self) -> Iterator[QueryLog]: and self._query_log_config.logs_group is not None ): client = self._query_log_config.aws.get_session().client("logs") - lookback_days = self._query_log_config.lookback_days - logs_group = self._query_log_config.logs_group for message in iterate_logs_from_cloud_watch( - client, lookback_days, logs_group + client=client, + lookback_days=self._query_log_config.lookback_days, + logs_group=self._query_log_config.logs_group, + filter_pattern=self._query_log_config.filter_pattern, ): - query_log = self._process_cloud_watch_log(message, previous_line_cache) + query_log = self._process_cloud_watch_log( + message, + previous_line_cache, + self._query_log_config.log_duration_enabled, + ) if query_log: yield query_log def _process_cloud_watch_log( - self, log: str, previous_line_cache: Dict[str, ParsedLog] + self, + log: str, + previous_line_cache: Dict[str, ParsedLog], + log_duration_enabled=True, ) -> Optional[QueryLog]: """ SQL statement and duration are in two consecutive record, example: @@ -461,9 +469,11 @@ def _process_cloud_watch_log( previous_line = previous_line_cache.get(parsed.session) previous_line_cache[parsed.session] = parsed + if not log_duration_enabled: + previous_line = parsed # The second line must be: duration: ms # Skip log that don't have previous line or invalid log - if ( + elif ( not parsed.log_body[0].lstrip().startswith("duration") or not previous_line or len(previous_line.log_body) < 2 @@ -473,7 +483,7 @@ def _process_cloud_watch_log( message_type = previous_line.log_body[0].lstrip() # Only `statement` (simple query), and `execute` (extended query) we should care about - if not message_type.startswith("statement") or message_type.startswith( + if not message_type.startswith("statement") and not message_type.startswith( "execute" ): return None @@ -484,7 +494,9 @@ def _process_cloud_watch_log( query = ":".join(previous_line.log_body[1:]).lstrip() # Extract duration from the current line - duration = self._extract_duration(parsed.log_body[1]) + duration = ( + self._extract_duration(parsed.log_body[1]) if log_duration_enabled else None + ) tll = extract_table_level_lineage( sql=query, diff --git a/pyproject.toml b/pyproject.toml index eac58b73..66935b86 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.121" +version = "0.14.122" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/postgresql/expected_logs_execute.json b/tests/postgresql/expected_logs_execute.json new file mode 100644 index 00000000..df6c0b15 --- /dev/null +++ b/tests/postgresql/expected_logs_execute.json @@ -0,0 +1,40 @@ +[ + { + "_id": "POSTGRESQL:9f07fea1cd4a1223a3695b4865f40d82", + "defaultDatabase": "metaphor", + "platform": "POSTGRESQL", + "queryId": "9f07fea1cd4a1223a3695b4865f40d82", + "sources": [ + { + "database": "metaphor", + "id": "DATASET~F68D8D6F1F49DA4605F13F20FD3CA883", + "schema": "schema", + "table": "table" + } + ], + "sql": "SELECT * FROM schema.table", + "sqlHash": "9f07fea1cd4a1223a3695b4865f40d82", + "startTime": "2024-08-29T09:25:50+00:00", + "targets": [], + "userId": "metaphor" + }, + { + "_id": "POSTGRESQL:06903bf0b96b1021e32edad5cf2b15f1", + "defaultDatabase": "metaphor", + "platform": "POSTGRESQL", + "queryId": "06903bf0b96b1021e32edad5cf2b15f1", + "sources": [ + { + "database": "metaphor", + "id": "DATASET~1149097F32B03CEE64BB356B77701736", + "schema": "schema2", + "table": "table" + } + ], + "sql": "SELECT * FROM schema2.table", + "sqlHash": "06903bf0b96b1021e32edad5cf2b15f1", + "startTime": "2024-08-29T09:25:50+00:00", + "targets": [], + "userId": "metaphor" + } +] diff --git a/tests/postgresql/test_extractor.py b/tests/postgresql/test_extractor.py index bbec7301..aaf74349 100644 --- a/tests/postgresql/test_extractor.py +++ b/tests/postgresql/test_extractor.py @@ -69,6 +69,7 @@ def dummy_config(**args): lookback_days=1, logs_group="group", excluded_usernames={"foo"}, + log_duration_enabled=True, aws=AwsCredentials( access_key_id="", secret_access_key="", @@ -220,9 +221,13 @@ async def test_extractor( mock_session.client.return_value = 1 mocked_get_session.return_value = mock_session - def mock_iterate_logs(a, b, c): - yield "2024-08-29 09:25:50 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: statement: SELECT x, y from schema.table;" - yield "2024-08-29 09:25:51 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: duration: 55.66 ms" + logs = [ + "2024-08-29 09:25:50 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: statement: SELECT x, y from schema.table;", + "2024-08-29 09:25:51 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: duration: 55.66 ms", + ] + + def mock_iterate_logs(**_): + yield from logs mocked_iterate_logs.side_effect = mock_iterate_logs @@ -274,3 +279,43 @@ def test_alter_rename(): type=None, user_id="metaphor", ) + + +@patch("metaphor.common.aws.AwsCredentials.get_session") +@patch("metaphor.postgresql.extractor.iterate_logs_from_cloud_watch") +def test_collect_query_logs( + mocked_iterate_logs: MagicMock, + mocked_get_session: MagicMock, + test_root_dir: str, +): + mock_session = MagicMock() + mock_session.client.return_value = 1 + mocked_get_session.return_value = mock_session + + date = "2024-08-29 09:25:50 UTC" + host = "10.1.1.134(48507)" + user_db = "metaphor@metaphor" + + logs = [ + f"{date}:{host}:{user_db}:[615]:LOG: execute : BEGIN", + f"{date}:{host}:{user_db}:[615]:LOG: execute : SELECT * FROM schema.table", + f"{date}:{host}:{user_db}:[615]:LOG: execute S_1: COMMIT", + f"{date}:{host}:{user_db}:[616]:LOG: execute : BEGIN", + f"{date}:{host}:{user_db}:[616]:LOG: execute : SELECT * FROM schema2.table", + f"{date}:{host}:{user_db}:[616]:LOG: execute S_1: COMMIT", + ] + + def mock_iterate_logs(**_): + yield from logs + + mocked_iterate_logs.side_effect = mock_iterate_logs + + config = dummy_config() + assert config.query_log + config.query_log.log_duration_enabled = False + + extractor = PostgreSQLExtractor(config) + log_events = [EventUtil.trim_event(e) for e in extractor.collect_query_logs()] + assert log_events == load_json( + f"{test_root_dir}/postgresql/expected_logs_execute.json" + )