Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Postgres log extraction with log_duration disabled #1004

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion metaphor/common/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@


def iterate_logs_from_cloud_watch(
client: client, lookback_days: int, logs_group: str
client: client,
lookback_days: int,
logs_group: str,
filter_pattern: Optional[str] = None,
) -> Iterator[str]:

logger.info(f"Collecting query log from cloud watch for {lookback_days} days")
Expand All @@ -79,6 +82,8 @@
}
if next_token:
params["nextToken"] = next_token
if filter_pattern:
params["filterPattern"] = filter_pattern

Check warning on line 86 in metaphor/common/aws.py

View check run for this annotation

Codecov / codecov/patch

metaphor/common/aws.py#L86

Added line #L86 was not covered by tests
response = client.filter_log_events(**params)

next_token = response["nextToken"] if "nextToken" in response else None
Expand Down
15 changes: 10 additions & 5 deletions metaphor/common/sql/process_query/process_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ def _is_insert_values_into(expression: Expression) -> bool:
)


ALLOW_EXPRESSION_TYPES = (
exp.Alter,
exp.DDL,
exp.DML,
exp.Merge,
exp.Query,
)


def process_query(
sql: str,
data_platform: DataPlatform,
Expand Down Expand Up @@ -78,11 +87,7 @@ def process_query(
if config.ignore_command_statement and isinstance(expression, exp.Command):
return None

if (
isinstance(expression, exp.Show)
or isinstance(expression, exp.Set)
or isinstance(expression, exp.Rollback)
):
if not isinstance(expression, ALLOW_EXPRESSION_TYPES):
elic-eon marked this conversation as resolved.
Show resolved Hide resolved
return None

if not config.redact_literals.enabled:
Expand Down
6 changes: 6 additions & 0 deletions metaphor/postgresql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ query_log:

# (Optional) Number of days of query logs to fetch. Default to 1. If 0, the no query logs will be fetched.
lookback_days: <days>

# (Optional) Extract query log duration if postgres parameter `log_duration` is true
log_duration_enabled: <bool>

# (Optional) CloudWatch log filter pattern, https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
filter_pattern: <pattern> # For example: -root LOG

# (Optional) A list of users whose queries will be excluded from the log fetching.
excluded_usernames:
Expand Down
2 changes: 2 additions & 0 deletions metaphor/postgresql/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class BasePostgreSQLRunConfig(BaseConfig):
@dataclass(config=ConnectorConfig)
class PostgreSQLQueryLogConfig(QueryLogConfig):
aws: Optional[AwsCredentials] = None
filter_pattern: Optional[str] = None
log_duration_enabled: bool = False
logs_group: Optional[str] = None


Expand Down
28 changes: 20 additions & 8 deletions metaphor/postgresql/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,18 +412,26 @@ def collect_query_logs(self) -> Iterator[QueryLog]:
and self._query_log_config.logs_group is not None
):
client = self._query_log_config.aws.get_session().client("logs")
lookback_days = self._query_log_config.lookback_days
logs_group = self._query_log_config.logs_group

for message in iterate_logs_from_cloud_watch(
client, lookback_days, logs_group
client=client,
lookback_days=self._query_log_config.lookback_days,
logs_group=self._query_log_config.logs_group,
filter_pattern=self._query_log_config.filter_pattern,
):
query_log = self._process_cloud_watch_log(message, previous_line_cache)
query_log = self._process_cloud_watch_log(
message,
previous_line_cache,
self._query_log_config.log_duration_enabled,
)
if query_log:
yield query_log

def _process_cloud_watch_log(
self, log: str, previous_line_cache: Dict[str, ParsedLog]
self,
log: str,
previous_line_cache: Dict[str, ParsedLog],
log_duration_enabled=True,
) -> Optional[QueryLog]:
"""
SQL statement and duration are in two consecutive record, example:
Expand Down Expand Up @@ -461,9 +469,11 @@ def _process_cloud_watch_log(
previous_line = previous_line_cache.get(parsed.session)
previous_line_cache[parsed.session] = parsed

if not log_duration_enabled:
previous_line = parsed
# The second line must be: duration: <number> ms
# Skip log that don't have previous line or invalid log
if (
elif (
not parsed.log_body[0].lstrip().startswith("duration")
or not previous_line
or len(previous_line.log_body) < 2
Expand All @@ -473,7 +483,7 @@ def _process_cloud_watch_log(
message_type = previous_line.log_body[0].lstrip()

# Only `statement` (simple query), and `execute` (extended query) we should care about
if not message_type.startswith("statement") or message_type.startswith(
if not message_type.startswith("statement") and not message_type.startswith(
"execute"
):
return None
Expand All @@ -484,7 +494,9 @@ def _process_cloud_watch_log(
query = ":".join(previous_line.log_body[1:]).lstrip()

# Extract duration from the current line
duration = self._extract_duration(parsed.log_body[1])
duration = (
self._extract_duration(parsed.log_body[1]) if log_duration_enabled else None
)

tll = extract_table_level_lineage(
sql=query,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.121"
version = "0.14.122"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
40 changes: 40 additions & 0 deletions tests/postgresql/expected_logs_execute.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[
{
"_id": "POSTGRESQL:9f07fea1cd4a1223a3695b4865f40d82",
"defaultDatabase": "metaphor",
"platform": "POSTGRESQL",
"queryId": "9f07fea1cd4a1223a3695b4865f40d82",
"sources": [
{
"database": "metaphor",
"id": "DATASET~F68D8D6F1F49DA4605F13F20FD3CA883",
"schema": "schema",
"table": "table"
}
],
"sql": "SELECT * FROM schema.table",
"sqlHash": "9f07fea1cd4a1223a3695b4865f40d82",
"startTime": "2024-08-29T09:25:50+00:00",
"targets": [],
"userId": "metaphor"
},
{
"_id": "POSTGRESQL:06903bf0b96b1021e32edad5cf2b15f1",
"defaultDatabase": "metaphor",
"platform": "POSTGRESQL",
"queryId": "06903bf0b96b1021e32edad5cf2b15f1",
"sources": [
{
"database": "metaphor",
"id": "DATASET~1149097F32B03CEE64BB356B77701736",
"schema": "schema2",
"table": "table"
}
],
"sql": "SELECT * FROM schema2.table",
"sqlHash": "06903bf0b96b1021e32edad5cf2b15f1",
"startTime": "2024-08-29T09:25:50+00:00",
"targets": [],
"userId": "metaphor"
}
]
51 changes: 48 additions & 3 deletions tests/postgresql/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def dummy_config(**args):
lookback_days=1,
logs_group="group",
excluded_usernames={"foo"},
log_duration_enabled=True,
aws=AwsCredentials(
access_key_id="",
secret_access_key="",
Expand Down Expand Up @@ -220,9 +221,13 @@ async def test_extractor(
mock_session.client.return_value = 1
mocked_get_session.return_value = mock_session

def mock_iterate_logs(a, b, c):
yield "2024-08-29 09:25:50 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: statement: SELECT x, y from schema.table;"
yield "2024-08-29 09:25:51 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: duration: 55.66 ms"
logs = [
"2024-08-29 09:25:50 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: statement: SELECT x, y from schema.table;",
"2024-08-29 09:25:51 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: duration: 55.66 ms",
]

def mock_iterate_logs(**_):
yield from logs

mocked_iterate_logs.side_effect = mock_iterate_logs

Expand Down Expand Up @@ -274,3 +279,43 @@ def test_alter_rename():
type=None,
user_id="metaphor",
)


@patch("metaphor.common.aws.AwsCredentials.get_session")
@patch("metaphor.postgresql.extractor.iterate_logs_from_cloud_watch")
def test_collect_query_logs(
mocked_iterate_logs: MagicMock,
mocked_get_session: MagicMock,
test_root_dir: str,
):
mock_session = MagicMock()
mock_session.client.return_value = 1
mocked_get_session.return_value = mock_session

date = "2024-08-29 09:25:50 UTC"
host = "10.1.1.134(48507)"
user_db = "metaphor@metaphor"

logs = [
f"{date}:{host}:{user_db}:[615]:LOG: execute <unnamed>: BEGIN",
f"{date}:{host}:{user_db}:[615]:LOG: execute <unnamed>: SELECT * FROM schema.table",
f"{date}:{host}:{user_db}:[615]:LOG: execute S_1: COMMIT",
f"{date}:{host}:{user_db}:[616]:LOG: execute <unnamed>: BEGIN",
f"{date}:{host}:{user_db}:[616]:LOG: execute <unnamed>: SELECT * FROM schema2.table",
f"{date}:{host}:{user_db}:[616]:LOG: execute S_1: COMMIT",
]

def mock_iterate_logs(**_):
yield from logs

mocked_iterate_logs.side_effect = mock_iterate_logs

config = dummy_config()
assert config.query_log
config.query_log.log_duration_enabled = False

extractor = PostgreSQLExtractor(config)
log_events = [EventUtil.trim_event(e) for e in extractor.collect_query_logs()]
assert log_events == load_json(
f"{test_root_dir}/postgresql/expected_logs_execute.json"
)
Loading