Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect all valid query history #975

Merged
merged 3 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion metaphor/common/sql/process_query/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,18 @@ class ProcessQueryConfig:
If this is set to `True`, when Sqlglot fails to parse a query we skip it from the collected MCE.
"""

ignore_command_statement: bool = True
"""
Skip commands that interact with databases, such as: create user
"""

@property
def should_process(self) -> bool:
"""
Whether we should run the processing method at all.
"""
return self.redact_literals.enabled or self.ignore_insert_values_into
return (
self.redact_literals.enabled
or self.ignore_insert_values_into
or self.ignore_command_statement
)
5 changes: 4 additions & 1 deletion metaphor/common/sql/process_query/process_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ def process_query(
if config.ignore_insert_values_into and _is_insert_values_into(expression):
return None

if config.ignore_command_statement and isinstance(expression, exp.Command):
return None

if not config.redact_literals.enabled:
return expression.sql(dialect=dialect)
return updated
elic-eon marked this conversation as resolved.
Show resolved Hide resolved

DialectClass: t.Type[Dialect]
if dialect is None:
Expand Down
4 changes: 0 additions & 4 deletions metaphor/postgresql/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,6 @@ def _process_cloud_watch_log(
default_database=parsed.database,
)

# Skip SQL statement that is not related to any table
if not tll.sources and not tll.targets:
return None

# Skip if parsed sources or targets has invalid data.
if not is_valid_queried_datasets(tll.sources) or not is_valid_queried_datasets(
tll.targets
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.98"
version = "0.14.99"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
6 changes: 5 additions & 1 deletion tests/common/sql/process_query/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ def test_config():
assert config.should_process

config = ProcessQueryConfig()
assert config.should_process

config = ProcessQueryConfig(ignore_command_statement=False)
assert not config.should_process

config = ProcessQueryConfig(
redact_literals=RedactPIILiteralsConfig(where_clauses=True)
redact_literals=RedactPIILiteralsConfig(where_clauses=True),
ignore_command_statement=False,
)
assert not config.should_process
19 changes: 11 additions & 8 deletions tests/common/sql/process_query/test_process_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@
ignore_insert_values_into=True,
)

pre_preprocess_only = ProcessQueryConfig()


@pytest.mark.parametrize(
["name", "platform"],
["name", "platform", "config"],
[
("redact_literal_values_in_where_clauses", DataPlatform.SNOWFLAKE),
("redact_simple", DataPlatform.SNOWFLAKE),
("snowflake_copy_into", DataPlatform.SNOWFLAKE),
("redact_literals_in_where_in", DataPlatform.MSSQL),
("redact_literals_in_where_or", DataPlatform.MSSQL),
("redact_merge_insert_when_not_matched", DataPlatform.MSSQL),
("redact_literal_values_in_where_clauses", DataPlatform.SNOWFLAKE, config),
("redact_simple", DataPlatform.SNOWFLAKE, config),
("snowflake_copy_into", DataPlatform.SNOWFLAKE, config),
("snowflake_copy_into", DataPlatform.SNOWFLAKE, pre_preprocess_only),
("redact_literals_in_where_in", DataPlatform.MSSQL, config),
("redact_literals_in_where_or", DataPlatform.MSSQL, config),
("redact_merge_insert_when_not_matched", DataPlatform.MSSQL, config),
],
)
def test_process_query(name: str, platform: DataPlatform):
def test_process_query(name: str, platform: DataPlatform, config: ProcessQueryConfig):
dir = Path(__file__).parent / "process_query" / name
with open(dir / "query.sql") as f:
sql = f.read()
Expand Down
44 changes: 43 additions & 1 deletion tests/postgresql/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def test_process_cloud_watch_log():
== gold
)

# SQL don't have source and targets
# Skip commands
assert (
extractor._process_cloud_watch_log(
"2024-08-29 09:25:50 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: statement: CREATE USER abc PASSWORD 'asjdkasjd';",
Expand Down Expand Up @@ -243,3 +243,45 @@ async def test_extractor(mocked_fetch_databases):
extractor = PostgreSQLExtractor(dummy_config())
events = [e for e in await extractor.extract()]
assert events == []


def test_alter_rename():
extractor = PostgreSQLExtractor(dummy_config())

cache = {}

# Valid Statement
assert (
extractor._process_cloud_watch_log(
"2024-08-29 09:25:50 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: statement: ALTER TABLE schema.table_tmp RENAME TO table;",
cache,
)
is None
)
assert extractor._process_cloud_watch_log(
"2024-08-29 09:25:51 UTC:10.1.1.134(48507):metaphor@metaphor:[615]:LOG: duration: 55.66 ms",
cache,
) == QueryLog(
id="POSTGRESQL:e6c7e28a652b40dc29c1a5ed5c1b1f16",
account=None,
bytes_read=None,
bytes_written=None,
cost=None,
default_database="metaphor",
default_schema=None,
duration=55.66,
email=None,
metadata=None,
parsing=None,
platform=DataPlatform.POSTGRESQL,
query_id="e6c7e28a652b40dc29c1a5ed5c1b1f16",
rows_read=None,
rows_written=None,
sources=[],
sql="ALTER TABLE schema.table_tmp RENAME TO table;",
sql_hash="e6c7e28a652b40dc29c1a5ed5c1b1f16",
start_time=datetime(2024, 8, 29, 9, 25, 50, tzinfo=timezone.utc),
targets=[],
type=None,
user_id="metaphor",
)
Loading