Skip to content

Commit

Permalink
Merge branch 'main' into notion-date-validator
Browse files Browse the repository at this point in the history
  • Loading branch information
rishimo committed Oct 8, 2024
2 parents cf7cea0 + 11b754e commit 4fa043e
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 208 deletions.
2 changes: 1 addition & 1 deletion metaphor/dbt/cloud/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, config: DbtCloudConfig):
url=self._discovery_api_url,
headers=headers,
http_client=httpx.Client(
timeout=None,
timeout=30,
headers=headers,
transport=LogTransport(httpx.HTTPTransport()),
),
Expand Down
23 changes: 15 additions & 8 deletions metaphor/postgresql/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,15 +436,19 @@ def _process_cloud_watch_log(
return None

parsed = parse_postgres_log(log)
if parsed is None:
return None

# Skip log without user and database, and log_level of SQL statement is "LOG"
if (
parsed is None
or not parsed.user
or not parsed.database
or parsed.log_level != "LOG"
or parsed.user in self._query_log_config.excluded_usernames
):
if not parsed.user or not parsed.database:
logger.debug(f"Invalid user or database, log: {log}")
return None

if parsed.log_level != "LOG":
logger.debug(f"Skip {parsed.log_level} log: {log}")
return None

if parsed.user in self._query_log_config.excluded_usernames:
logger.debug(f"Skip {parsed}'s query")
return None

previous_line = previous_line_cache.get(parsed.session)
Expand All @@ -467,6 +471,8 @@ def _process_cloud_watch_log(
):
return None

logger.debug(f"processing valid query: {log}")

# Extract sql from the previous line
query = ":".join(previous_line.log_body[1:]).lstrip()

Expand All @@ -484,6 +490,7 @@ def _process_cloud_watch_log(
if not is_valid_queried_datasets(tll.sources) or not is_valid_queried_datasets(
tll.targets
):
logger.debug(f"invalid sources/targets, log: {log}")
return None

sql_hash = md5_digest(query.encode("utf-8"))
Expand Down
6 changes: 0 additions & 6 deletions metaphor/redshift/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,6 @@ def _process_record(self, access_event: AccessEvent):
default_database=access_event.database,
)

if not (
self._is_related_query_log(tll.sources)
or self._is_related_query_log(tll.targets)
):
return

sql: Optional[str] = access_event.querytxt
if self._query_log_config.process_query.should_process:
sql = process_query(
Expand Down
Loading

0 comments on commit 4fa043e

Please sign in to comment.