Skip to content

Commit

Permalink
Fill sources field for Oracle query log extractor
Browse files Browse the repository at this point in the history
  • Loading branch information
elic-eon committed Aug 21, 2024
1 parent f20eb67 commit 82c8ff0
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 54 deletions.
1 change: 1 addition & 0 deletions metaphor/common/sql/dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
DataPlatform.REDSHIFT: "redshift",
DataPlatform.SNOWFLAKE: "snowflake",
DataPlatform.UNITY_CATALOG: "databricks",
DataPlatform.ORACLE: "oracle",
}
"""
Mapping from data platforms supported by Metaphor to dialects recognized by SQLGlot.
Expand Down
16 changes: 10 additions & 6 deletions metaphor/common/sql/table_level_lineage/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@ def from_sqlglot_table(cls, table: exp.Table):
)

def to_queried_dataset(
self, platform: DataPlatform, account: Optional[str], database: Optional[str]
self,
platform: DataPlatform,
account: Optional[str],
default_database: Optional[str] = None,
default_schema: Optional[str] = None,
):
schema = self.schema or default_schema
database = self.db or default_database
return QueriedDataset(
database=self.db,
schema=self.schema,
database=database,
schema=schema,
table=self.table,
id=str(
to_dataset_entity_id(
dataset_normalized_name(
self.db or database, self.schema, self.table
),
dataset_normalized_name(database, schema, self.table),
platform,
account,
)
Expand Down
11 changes: 8 additions & 3 deletions metaphor/common/sql/table_level_lineage/table_level_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ def extract_table_level_lineage(
account: Optional[str],
statement_type: Optional[str] = None,
query_id: Optional[str] = None,
database: Optional[str] = None,
default_database: Optional[str] = None,
default_schema: Optional[str] = None,
) -> Result:

if statement_type and statement_type.upper() not in _VALID_STATEMENT_TYPES:
Expand All @@ -153,11 +154,15 @@ def extract_table_level_lineage(
try:
return Result(
targets=[
target.to_queried_dataset(platform, account, database)
target.to_queried_dataset(
platform, account, default_database, default_schema
)
for target in _find_targets(expression)
],
sources=[
source.to_queried_dataset(platform, account, database)
source.to_queried_dataset(
platform, account, default_database, default_schema
)
for source in _find_sources(expression)
],
)
Expand Down
2 changes: 1 addition & 1 deletion metaphor/informatica/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def trans_source(source: MappingParameter) -> List[str]:
sql=source.customQuery,
platform=get_platform(connection),
account=get_account(connection),
database=connection.database,
default_database=connection.database,
)
return [dataset.id for dataset in result.sources]

Expand Down
112 changes: 83 additions & 29 deletions metaphor/oracle/extractor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from typing import Collection, Iterator, List

from sqlalchemy import Inspector, text
from sqlalchemy import Connection, Inspector, text

from metaphor.common.entity_id import dataset_normalized_name
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.logger import get_logger
from metaphor.common.utils import md5_digest, start_of_day, to_utc_time
from metaphor.common.sql.table_level_lineage.table_level_lineage import (
extract_table_level_lineage,
)
from metaphor.common.utils import md5_digest, safe_float, start_of_day, to_utc_time
from metaphor.database.extractor import GenericDatabaseExtractor
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
Expand Down Expand Up @@ -175,6 +178,56 @@ def _extract_ddl(self, inspector: Inspector):
assert dataset.schema and dataset.schema.sql_schema
dataset.schema.sql_schema.table_schema = ddl

def _inner_fetch_query_logs(
self, sql: str, connection: Connection
) -> List[QueryLog]:
cursor = connection.execute(text(sql))

rows = cursor.fetchall()
logs: List[QueryLog] = []
for (
user,
query,
start,
duration,
query_id,
read_bytes,
write_bytes,
row_count,
) in rows:
schema = user.lower() if user else None
database = self._database if self._database else None

ttl = extract_table_level_lineage(
query,
platform=DataPlatform.ORACLE,
account=self._alternative_host or self._config.host,
query_id=query_id,
default_database=database,
default_schema=schema,
)

logs.append(
QueryLog(
id=f"{DataPlatform.ORACLE.name}:{query_id}",
query_id=query_id,
platform=DataPlatform.ORACLE,
default_database=database,
default_schema=schema,
user_id=user,
sql=query,
sql_hash=md5_digest(query.encode("utf-8")),
duration=float(duration),
start_time=to_utc_time(start),
bytes_read=safe_float(read_bytes),
bytes_written=safe_float(write_bytes),
sources=ttl.sources,
rows_read=safe_float(row_count),
targets=ttl.targets,
)
)
return logs

def _extract_query_logs(self, inspector: Inspector, excluded_users: List[str]):
start_time = start_of_day(
daysAgo=self._query_logs_config.lookback_days
Expand All @@ -183,33 +236,34 @@ def _extract_query_logs(self, inspector: Inspector, excluded_users: List[str]):
users = [f"'{user.upper()}'" for user in excluded_users]

with inspector.engine.connect() as connection:
offset = 0
result_limit = 1000
filters = f"""AND PARSING_SCHEMA_NAME not in ({','.join(users)})"""

sql = f"""
SELECT
PARSING_SCHEMA_NAME,
SQL_FULLTEXT AS query_text,
TO_TIMESTAMP(FIRST_LOAD_TIME, 'yy-MM-dd/HH24:MI:SS') AS start_time,
ELAPSED_TIME / 1000 AS duration,
SQL_ID
FROM gv$sql
WHERE OBJECT_STATUS = 'VALID'
{filters}
AND TO_TIMESTAMP(FIRST_LOAD_TIME, 'yy-MM-dd/HH24:MI:SS') >= TO_TIMESTAMP('{start_time}', 'yy-MM-dd HH24:MI:SS')
ORDER BY FIRST_LOAD_TIME DESC
OFFSET 0 ROWS FETCH NEXT {result_limit} ROWS ONLY
"""

cursor = connection.execute(text(sql))
for user, query, start, duration, query_id in cursor:
yield QueryLog(
id=f"{DataPlatform.ORACLE.name}:{query_id}",
query_id=query_id,
platform=DataPlatform.ORACLE,
user_id=user,
sql=query,
sql_hash=md5_digest(query.encode("utf-8")),
duration=float(duration),
start_time=to_utc_time(start),
)
while True:
sql = f"""
SELECT
PARSING_SCHEMA_NAME,
SQL_FULLTEXT AS query_text,
TO_TIMESTAMP(FIRST_LOAD_TIME, 'yy-MM-dd/HH24:MI:SS') AS start_time,
ELAPSED_TIME / 1000 AS duration,
SQL_ID,
PHYSICAL_READ_BYTES,
PHYSICAL_WRITE_BYTES,
ROWS_PROCESSED
FROM gv$sql
WHERE OBJECT_STATUS = 'VALID'
{filters}
AND TO_TIMESTAMP(FIRST_LOAD_TIME, 'yy-MM-dd/HH24:MI:SS') >= TO_TIMESTAMP('{start_time}', 'yy-MM-dd HH24:MI:SS')
ORDER BY FIRST_LOAD_TIME DESC
OFFSET {offset} ROWS FETCH NEXT {offset + result_limit} ROWS ONLY
"""
logs = self._inner_fetch_query_logs(sql, connection)
for log in logs:
yield log

logger.info(f"Fetched {len(logs)} query logs")

if len(logs) < result_limit:
break
offset += result_limit
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,20 @@ def test_select_from_unnest_with_ordinality(platform: DataPlatform):
CROSS JOIN UNNEST(numbers) WITH ORDINALITY AS t (n, a);
"""
assert_table_lineage_equal(sql, None, None, platform=platform)


@pytest.mark.parametrize("platform", [DataPlatform.ORACLE])
def test_oracle_select_statement(platform: DataPlatform):
assert_table_lineage_equal(
"/* **** 60 */\nwith ss as (\n select\n i_item_id,sum(ss_ext_sales_price) total_sales\n from\n \tstore_sales,\n \tdate_dim,\n customer_address,\n item\n where\n i_item_id in (select\n i_item_id\nfrom\n item\nwhere i_category in ('Children'))\n and ss_item_sk = i_item_sk\n and ss_sold_date_sk = d_date_sk\n and d_year = 2000\n and d_moy = 8\n and ss_addr_sk = ca_address_sk\n and ca_gmt_offset = -7\n group by i_item_id),\n cs as (\n select\n i_item_id,sum(cs_ext_sales_price) total_sales\n from\n \tcatalog_sales,\n \tdate_dim,\n customer_address,\n item\n where\n i_item_id in (select\n i_item_id\nfrom\n item\nwhere i_category in ('Children'))\n and cs_item_sk = i_item_sk\n and cs_sold_date_sk = d_date_sk\n and d_year = 2000\n and d_moy = 8\n and cs_bill_addr_sk = ca_address_sk\n and ca_gmt_offset = -7\n group by i_item_id),\n ws as (\n select\n i_item_id,sum(ws_ext_sales_price) total_sales\n from\n \tweb_sales,\n \tdate_dim,\n customer_address,\n item\n where\n i_item_id in (select\n i_item_id\nfrom\n item\nwhere i_category in ('Children'))\n and ws_item_sk = i_item_sk\n and ws_sold_date_sk = d_date_sk\n and d_year = 2000\n and d_moy = 8\n and ws_bill_addr_sk = ca_address_sk\n and ca_gmt_offset = -7\n group by i_item_id)\n select * from ( select\n i_item_id\n,sum(total_sales) total_sales\n from (select * from ss\n union all\n select * from cs\n union all\n select * from ws) tmp1\n group by i_item_id\n order by i_item_id\n ,total_sales\n ) where rownum <= 100",
{
"catalog_sales",
"customer_address",
"date_dim",
"item",
"store_sales",
"web_sales",
},
set(),
platform=platform,
)
20 changes: 17 additions & 3 deletions tests/oracle/expected_query_logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,23 @@
"duration": 10.0,
"platform": "ORACLE",
"queryId": "sql-id",
"sql": "SELECT...",
"startTime": "2024-07-30T15:31:33+00:00",
"sqlHash": "191df6d782898cbb739c413fa5868422",
"userId": "DEV"
"userId": "DEV",
"bytesRead": 1024.0,
"bytesWritten": 0.0,
"defaultDatabase": "db",
"defaultSchema": "dev",
"rowsRead": 20.0,
"sources": [
{
"database": "db",
"id": "DATASET~FB036B29A2F624861C86A3D4237DF75F",
"schema": "dev",
"table": "TABLE_5566"
}
],
"sql": "SELECT x, y FROM TABLE_5566",
"sqlHash": "a5bfd703435ab791044cc28af8999abf",
"targets": []
}
]
24 changes: 12 additions & 12 deletions tests/oracle/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,18 @@ def mock_connection():
[("mview",)], # extract_mviews_names, prod
[("TABLE1", "DEV", "DDL ......")], # extract_ddl
[("SYS",)], # get_system_users
]
)

cursor.__iter__.return_value = iter(
[
(
"DEV",
"SELECT...",
datetime.fromtimestamp(1722353493, tz=timezone.utc),
10.0,
"sql-id",
)
[
(
"DEV",
"SELECT x, y FROM TABLE_5566",
datetime.fromtimestamp(1722353493, tz=timezone.utc),
10.0,
"sql-id",
1024.0,
0.0,
20,
)
],
]
)

Expand Down

0 comments on commit 82c8ff0

Please sign in to comment.