Skip to content

Commit

Permalink
Extract query with log_duration: false
Browse files Browse the repository at this point in the history
  • Loading branch information
elic-eon committed Oct 9, 2024
1 parent 59ee58f commit 100d38f
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 8 deletions.
1 change: 1 addition & 0 deletions metaphor/common/sql/process_query/process_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def process_query(
isinstance(expression, exp.Show)
or isinstance(expression, exp.Set)
or isinstance(expression, exp.Rollback)
or isinstance(expression, exp.Commit)
):
return None

Expand Down
1 change: 1 addition & 0 deletions metaphor/postgresql/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class BasePostgreSQLRunConfig(BaseConfig):
@dataclass(config=ConnectorConfig)
class PostgreSQLQueryLogConfig(QueryLogConfig):
aws: Optional[AwsCredentials] = None
log_duration_enabled: bool = False
logs_group: Optional[str] = None


Expand Down
21 changes: 16 additions & 5 deletions metaphor/postgresql/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,19 @@ def collect_query_logs(self) -> Iterator[QueryLog]:
for message in iterate_logs_from_cloud_watch(
client, lookback_days, logs_group
):
query_log = self._process_cloud_watch_log(message, previous_line_cache)
query_log = self._process_cloud_watch_log(
message,
previous_line_cache,
self._query_log_config.log_duration_enabled,
)
if query_log:
yield query_log

def _process_cloud_watch_log(
self, log: str, previous_line_cache: Dict[str, ParsedLog]
self,
log: str,
previous_line_cache: Dict[str, ParsedLog],
log_duration_enabled=True,
) -> Optional[QueryLog]:
"""
SQL statement and duration are in two consecutive record, example:
Expand Down Expand Up @@ -461,9 +468,11 @@ def _process_cloud_watch_log(
previous_line = previous_line_cache.get(parsed.session)
previous_line_cache[parsed.session] = parsed

if not log_duration_enabled:
previous_line = parsed
# The second line must be: duration: <number> ms
# Skip log that don't have previous line or invalid log
if (
elif (
not parsed.log_body[0].lstrip().startswith("duration")
or not previous_line
or len(previous_line.log_body) < 2
Expand All @@ -473,7 +482,7 @@ def _process_cloud_watch_log(
message_type = previous_line.log_body[0].lstrip()

# Only `statement` (simple query), and `execute` (extended query) we should care about
if not message_type.startswith("statement") or message_type.startswith(
if not message_type.startswith("statement") and not message_type.startswith(
"execute"
):
return None
Expand All @@ -484,7 +493,9 @@ def _process_cloud_watch_log(
query = ":".join(previous_line.log_body[1:]).lstrip()

# Extract duration from the current line
duration = self._extract_duration(parsed.log_body[1])
duration = (
self._extract_duration(parsed.log_body[1]) if log_duration_enabled else None
)

tll = extract_table_level_lineage(
sql=query,
Expand Down
40 changes: 40 additions & 0 deletions tests/postgresql/expected_logs_execute.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[
{
"_id": "POSTGRESQL:9f07fea1cd4a1223a3695b4865f40d82",
"defaultDatabase": "metaphor",
"platform": "POSTGRESQL",
"queryId": "9f07fea1cd4a1223a3695b4865f40d82",
"sources": [
{
"database": "metaphor",
"id": "DATASET~F68D8D6F1F49DA4605F13F20FD3CA883",
"schema": "schema",
"table": "table"
}
],
"sql": "SELECT * FROM schema.table",
"sqlHash": "9f07fea1cd4a1223a3695b4865f40d82",
"startTime": "2024-08-29T09:25:50+00:00",
"targets": [],
"userId": "metaphor"
},
{
"_id": "POSTGRESQL:06903bf0b96b1021e32edad5cf2b15f1",
"defaultDatabase": "metaphor",
"platform": "POSTGRESQL",
"queryId": "06903bf0b96b1021e32edad5cf2b15f1",
"sources": [
{
"database": "metaphor",
"id": "DATASET~1149097F32B03CEE64BB356B77701736",
"schema": "schema2",
"table": "table"
}
],
"sql": "SELECT * FROM schema2.table",
"sqlHash": "06903bf0b96b1021e32edad5cf2b15f1",
"startTime": "2024-08-29T09:25:50+00:00",
"targets": [],
"userId": "metaphor"
}
]
51 changes: 48 additions & 3 deletions tests/postgresql/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def dummy_config(**args):
lookback_days=1,
logs_group="group",
excluded_usernames={"foo"},
log_duration_enabled=True,
aws=AwsCredentials(
access_key_id="",
secret_access_key="",
Expand Down Expand Up @@ -220,9 +221,13 @@ async def test_extractor(
mock_session.client.return_value = 1
mocked_get_session.return_value = mock_session

def mock_iterate_logs(a, b, c):
yield "2024-08-29 09:25:50 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: statement: SELECT x, y from schema.table;"
yield "2024-08-29 09:25:51 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: duration: 55.66 ms"
logs = [
"2024-08-29 09:25:50 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: statement: SELECT x, y from schema.table;",
"2024-08-29 09:25:51 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: duration: 55.66 ms",
]

def mock_iterate_logs(_1, _2, _3):
yield from logs

mocked_iterate_logs.side_effect = mock_iterate_logs

Expand Down Expand Up @@ -274,3 +279,43 @@ def test_alter_rename():
type=None,
user_id="metaphor",
)


@patch("metaphor.common.aws.AwsCredentials.get_session")
@patch("metaphor.postgresql.extractor.iterate_logs_from_cloud_watch")
def test_collect_query_logs(
mocked_iterate_logs: MagicMock,
mocked_get_session: MagicMock,
test_root_dir: str,
):
mock_session = MagicMock()
mock_session.client.return_value = 1
mocked_get_session.return_value = mock_session

date = "2024-08-29 09:25:50 UTC"
host = "10.1.1.134(48507)"
user_db = "metaphor@metaphor"

logs = [
f"{date}:{host}:{user_db}:[615]:LOG: execute <unnamed>: BEGIN",
f"{date}:{host}:{user_db}:[615]:LOG: execute <unnamed>: SELECT * FROM schema.table",
f"{date}:{host}:{user_db}:[615]:LOG: execute S_1: COMMIT",
f"{date}:{host}:{user_db}:[616]:LOG: execute <unnamed>: BEGIN",
f"{date}:{host}:{user_db}:[616]:LOG: execute <unnamed>: SELECT * FROM schema2.table",
f"{date}:{host}:{user_db}:[616]:LOG: execute S_1: COMMIT",
]

def mock_iterate_logs(_1, _2, _3):
yield from logs

mocked_iterate_logs.side_effect = mock_iterate_logs

config = dummy_config()
assert config.query_log
config.query_log.log_duration_enabled = False

extractor = PostgreSQLExtractor(config)
log_events = [EventUtil.trim_event(e) for e in extractor.collect_query_logs()]
assert log_events == load_json(
f"{test_root_dir}/postgresql/expected_logs_execute.json"
)

0 comments on commit 100d38f

Please sign in to comment.