Skip to content

Commit

Permalink
Fix Snowflake rows read & written exraction (#606)
Browse files Browse the repository at this point in the history
  • Loading branch information
mars-lan authored Sep 27, 2023
1 parent 6565f5e commit 3d8b1be
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 10 deletions.
25 changes: 19 additions & 6 deletions metaphor/snowflake/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ def _batch_query_for_access_logs(
x: QueryWithParam(
f"""
SELECT q.QUERY_ID, q.USER_NAME, QUERY_TEXT, START_TIME, TOTAL_ELAPSED_TIME, CREDITS_USED_CLOUD_SERVICES,
DATABASE_NAME, SCHEMA_NAME, BYTES_SCANNED, BYTES_WRITTEN, ROWS_PRODUCED,
DATABASE_NAME, SCHEMA_NAME, BYTES_SCANNED, BYTES_WRITTEN, ROWS_PRODUCED, ROWS_INSERTED, ROWS_UPDATED,
DIRECT_OBJECTS_ACCESSED, OBJECTS_MODIFIED
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY q
JOIN SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY a
Expand Down Expand Up @@ -471,7 +471,7 @@ def _batch_query_for_query_logs(
x: QueryWithParam(
f"""
SELECT QUERY_ID, USER_NAME, QUERY_TEXT, START_TIME, TOTAL_ELAPSED_TIME, CREDITS_USED_CLOUD_SERVICES,
DATABASE_NAME, SCHEMA_NAME, BYTES_SCANNED, BYTES_WRITTEN, ROWS_PRODUCED
DATABASE_NAME, SCHEMA_NAME, BYTES_SCANNED, BYTES_WRITTEN, ROWS_PRODUCED, ROWS_INSERTED, ROWS_UPDATED
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY q
WHERE EXECUTION_STATUS = 'SUCCESS'
AND START_TIME > %s AND START_TIME <= %s
Expand Down Expand Up @@ -503,6 +503,8 @@ def _parse_query_logs(self, batch_number: str, query_logs: List[Tuple]) -> None:
bytes_scanned,
bytes_written,
rows_produced,
rows_inserted,
rows_updated,
*access_objects,
) in query_logs:
try:
Expand All @@ -524,13 +526,24 @@ def _parse_query_logs(self, batch_number: str, query_logs: List[Tuple]) -> None:
account=self._account,
start_time=start_time,
duration=float(elapsed_time / 1000.0),
cost=float(credit) if credit else None,
cost=float(credit) if credit is not None else None,
user_id=username,
default_database=default_database,
default_schema=default_schema,
rows_written=float(rows_produced) if rows_produced else None,
bytes_read=float(bytes_scanned) if bytes_scanned else None,
bytes_written=float(bytes_written) if bytes_written else None,
rows_read=float(rows_produced)
if rows_produced is not None
else None,
rows_written=float(rows_inserted)
if rows_inserted is not None
else float(rows_updated)
if rows_updated is not None
else None,
bytes_read=float(bytes_scanned)
if bytes_scanned is not None
else None,
bytes_written=float(bytes_written)
if bytes_written is not None
else None,
sources=sources,
targets=targets,
sql=query_text,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.12.51"
version = "0.12.52"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
12 changes: 9 additions & 3 deletions tests/snowflake/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,11 @@ def test_parse_query_logs(mock_connect: MagicMock):
"0.000296",
"ACME",
"RIDE_SHARE",
131072,
86016,
3868,
100, # BYTES_SCANNED
200, # BYTES_WRITTEN
10, # ROWS_PRODUCED
20, # ROWS_INSERTED
0, # ROWS_UPDATED
json.dumps(
[
{
Expand Down Expand Up @@ -569,6 +571,10 @@ def test_parse_query_logs(mock_connect: MagicMock):
assert len(extractor._logs) == 1
log0 = extractor._logs[0]
assert log0.query_id == "id1"
assert log0.bytes_read == 100
assert log0.bytes_written == 200
assert log0.rows_read == 10
assert log0.rows_written == 20
assert log0.sources == [
QueriedDataset(
id="DATASET~965CB9D50FF7E59D766536D8ED07E862",
Expand Down

0 comments on commit 3d8b1be

Please sign in to comment.