Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Redshift serverless #960

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions metaphor/oracle/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def _inner_fetch_query_logs(
schema = user.lower() if user else None
database = self._database if self._database else None

ttl = extract_table_level_lineage(
tll = extract_table_level_lineage(
query,
platform=DataPlatform.ORACLE,
account=self._alternative_host or self._config.host,
Expand All @@ -221,9 +221,9 @@ def _inner_fetch_query_logs(
start_time=to_utc_time(start),
bytes_read=safe_float(read_bytes),
bytes_written=safe_float(write_bytes),
sources=ttl.sources,
sources=tll.sources,
rows_read=safe_float(row_count),
targets=ttl.targets,
targets=tll.targets,
)
)
return logs
Expand Down
7 changes: 5 additions & 2 deletions metaphor/redshift/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ Use the following command to grant the permission:
# Create a new user called "metaphor"
CREATE USER metaphor PASSWORD <password>;

# Grant minimally required privleages to the user
# Grant minimally required privileges to the user
GRANT SELECT ON pg_catalog.svv_table_info TO metaphor;

# Grant access to syslog "STL_SCAN" and "STL_QUERY"
# Grant access to syslog "SYS_*"
ALTER USER metaphor WITH SYSLOG ACCESS UNRESTRICTED;

# Grant access to "PG_USER_INFO"
GRANT SELECT ON pg_catalog.pg_user_info TO metaphor;
```

> Note: If the Redshift cluster contains more than one database, you must grant the permission in all databases. Alternatively, you can limit the connector to a subset of databases using the [filter configuration](../common/docs/filter.md).
Expand Down
63 changes: 25 additions & 38 deletions metaphor/redshift/access_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
WITH queries AS (
SELECT
*,
SUM(LEN(text)) OVER (PARTITION BY query) AS length
SUM(LEN(text)) OVER (PARTITION BY query_id) AS length
FROM
stl_querytext
sys_query_text
), filtered AS (
SELECT
q.query,
q.query_id,
LISTAGG(
CASE
WHEN LEN(RTRIM(q.text)) = 0 THEN q.text
Expand All @@ -26,31 +26,25 @@
WHERE
q.length < 65536
GROUP BY
q.query
q.query_id
)
SELECT DISTINCT
ss.userid,
ss.query,
sui.usename,
ss.rows,
ss.bytes,
ss.tbl,
sqh.user_id,
sqh.query_id,
pu.usename,
sqh.returned_rows AS rows,
sqh.returned_bytes AS bytes,
q.querytxt,
sti.database,
sti.schema,
sti.table,
sq.starttime,
sq.endtime,
sq.aborted
FROM stl_scan ss
JOIN svv_table_info sti ON ss.tbl = sti.table_id
JOIN stl_query sq ON ss.query = sq.query
JOIN filtered q ON ss.query = q.query
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE ss.starttime >= '{start_time}'
AND ss.starttime < '{end_time}'
AND sq.aborted = 0
ORDER BY ss.endtime DESC;
trim(sqh.database_name) AS database,
sqh.start_time,
sqh.end_time
FROM sys_query_history sqh
JOIN filtered q ON sqh.query_id = q.query_id
JOIN pg_user_info pu ON sqh.user_id = pu.usesysid
WHERE sqh.status = 'success'
AND sqh.start_time >= '{start_time}'
AND sqh.start_time < '{end_time}'
ORDER BY sqh.end_time DESC;
"""
"""
The condition `length < 65536` is because Redshift's LISTAGG method
Expand All @@ -61,19 +55,15 @@

@dataclass(frozen=True)
class AccessEvent:
userid: int
query: int
user_id: int
query_id: int
usename: str
tbl: int
rows: int
bytes: int
querytxt: str
database: str
schema: str
table: str
starttime: datetime
endtime: datetime
aborted: int
end_time: datetime
start_time: datetime

@staticmethod
def from_record(record: Record) -> "AccessEvent":
Expand All @@ -87,16 +77,13 @@
if isinstance(v, str):
record[k] = v.strip()

record["starttime"] = record["starttime"].replace(tzinfo=timezone.utc)
record["endtime"] = record["endtime"].replace(tzinfo=timezone.utc)
record["start_time"] = record["start_time"].replace(tzinfo=timezone.utc)
record["end_time"] = record["end_time"].replace(tzinfo=timezone.utc)

Check warning on line 81 in metaphor/redshift/access_event.py

View check run for this annotation

Codecov / codecov/patch

metaphor/redshift/access_event.py#L80-L81

Added lines #L80 - L81 were not covered by tests

record["querytxt"] = record["querytxt"].replace("\\n", "\n")

return AccessEvent(**record)

def table_name(self) -> str:
return f"{self.database}.{self.schema}.{self.table}"

@staticmethod
async def fetch_access_event(
conn: Connection,
Expand Down
62 changes: 37 additions & 25 deletions metaphor/redshift/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
from typing import Collection, Iterator, List, Set

from metaphor.common.constants import BYTES_PER_MEGABYTES
from metaphor.common.entity_id import dataset_normalized_name, to_dataset_entity_id
from metaphor.common.entity_id import dataset_normalized_name
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.logger import get_logger
from metaphor.common.models import to_dataset_statistics
from metaphor.common.sql.table_level_lineage.table_level_lineage import (
extract_table_level_lineage,
)
from metaphor.common.tag_matcher import tag_datasets
from metaphor.common.utils import md5_digest, start_of_day
from metaphor.models.crawler_run_metadata import Platform
Expand Down Expand Up @@ -34,7 +37,11 @@
super().__init__(config)
self._tag_matchers = config.tag_matchers
self._query_log_lookback_days = config.query_log.lookback_days

# Exclude metaphor user
self._query_log_excluded_usernames = config.query_log.excluded_usernames
self._query_log_excluded_usernames.add(config.user)

self._filter = exclude_system_databases(self._filter)

self._logs: List[QueryLog] = []
Expand Down Expand Up @@ -138,44 +145,49 @@
if query_log:
yield query_log

def _process_record(self, access_event: AccessEvent):
if not self._filter.include_table(
access_event.database, access_event.schema, access_event.table
):
return
def _is_related_query_log(self, queried_datasets: List[QueriedDataset]) -> bool:
for dataset in queried_datasets:
table_name = dataset_normalized_name(
db=dataset.database, schema=dataset.schema, table=dataset.table
)

if table_name in self._datasets:
return True
return False

Check warning on line 156 in metaphor/redshift/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/redshift/extractor.py#L156

Added line #L156 was not covered by tests

def _process_record(self, access_event: AccessEvent):
if access_event.usename in self._query_log_excluded_usernames:
return

sources = [self._convert_resource_to_queried_dataset(access_event)]
tll = extract_table_level_lineage(
sql=access_event.querytxt,
platform=DataPlatform.REDSHIFT,
account=None,
query_id=str(access_event.query_id),
default_database=access_event.database,
)

if not (
self._is_related_query_log(tll.sources)
or self._is_related_query_log(tll.targets)
):
return

Check warning on line 174 in metaphor/redshift/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/redshift/extractor.py#L174

Added line #L174 was not covered by tests

query_log = QueryLog(
id=f"{DataPlatform.REDSHIFT.name}:{access_event.query}",
query_id=str(access_event.query),
id=f"{DataPlatform.REDSHIFT.name}:{access_event.query_id}",
query_id=str(access_event.query_id),
platform=DataPlatform.REDSHIFT,
start_time=access_event.starttime,
start_time=access_event.start_time,
duration=float(
(access_event.endtime - access_event.starttime).total_seconds()
(access_event.end_time - access_event.start_time).total_seconds()
),
user_id=access_event.usename,
rows_read=float(access_event.rows),
bytes_read=float(access_event.bytes),
sources=sources,
sources=tll.sources,
targets=tll.targets,
sql=access_event.querytxt,
sql_hash=md5_digest(access_event.querytxt.encode("utf-8")),
)

return query_log

@staticmethod
def _convert_resource_to_queried_dataset(event: AccessEvent) -> QueriedDataset:
dataset_name = dataset_normalized_name(
event.database, event.schema, event.table
)
dataset_id = str(to_dataset_entity_id(dataset_name, DataPlatform.REDSHIFT))
return QueriedDataset(
id=dataset_id,
database=event.database,
schema=event.schema,
table=event.table,
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.84"
version = "0.14.85"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
10 changes: 6 additions & 4 deletions tests/redshift/query_logs.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
"table": "table1"
}
],
"sql": "UaEktANLgExavLlDcKmu",
"sqlHash": "b871153e9d0d386ce0355159f196f0ad",
"targets": [],
"sql": "select * from schema1.table1",
"sqlHash": "fbbd533367fe766d71a48425c5c48654",
"startTime": "2010-06-26T10:07:18.014673",
"userId": "aZUytYDMFTryuKyWtbEM"
},
Expand All @@ -37,8 +38,9 @@
"table": "table2"
}
],
"sql": "qUKVJPZNJEmeMNSgnVkF",
"sqlHash": "2d1aea13de8a75a410b37c223246810f",
"targets": [],
"sql": "select * from schema2.table2",
"sqlHash": "9f98db863e022a1b0c8b0895c1d4dec1",
"startTime": "2003-10-01T10:15:51.904151",
"userId": "IevfvBUzEVUDrTbaIWKY"
}
Expand Down
30 changes: 12 additions & 18 deletions tests/redshift/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,26 @@ def test_collect_query_logs(test_root_dir: str) -> None:
# Random stuff generated with polyfactory
access_events = [
AccessEvent(
userid=7610,
query=7086,
user_id=7610,
query_id=7086,
usename="aZUytYDMFTryuKyWtbEM",
tbl=9923,
rows=7155,
bytes=6500,
querytxt="UaEktANLgExavLlDcKmu",
querytxt="select * from schema1.table1",
database="db1",
schema="schema1",
table="table1",
starttime=datetime.datetime(2010, 6, 26, 10, 7, 18, 14673),
endtime=datetime.datetime(1999, 10, 8, 12, 25, 39, 706365),
aborted=3583,
start_time=datetime.datetime(2010, 6, 26, 10, 7, 18, 14673),
end_time=datetime.datetime(1999, 10, 8, 12, 25, 39, 706365),
),
AccessEvent(
userid=8902,
query=9910,
user_id=8902,
query_id=9910,
usename="IevfvBUzEVUDrTbaIWKY",
tbl=8494,
rows=4617,
bytes=176,
querytxt="qUKVJPZNJEmeMNSgnVkF",
querytxt="select * from schema2.table2",
database="db2",
schema="schema2",
table="table2",
starttime=datetime.datetime(2003, 10, 1, 10, 15, 51, 904151),
endtime=datetime.datetime(1996, 11, 14, 17, 6, 9, 676224),
aborted=5146,
start_time=datetime.datetime(2003, 10, 1, 10, 15, 51, 904151),
end_time=datetime.datetime(1996, 11, 14, 17, 6, 9, 676224),
),
]

Expand Down Expand Up @@ -103,6 +95,8 @@ async def __anext__(self):

extractor = RedshiftExtractor(dummy_config())
extractor._included_databases = included_dbs
extractor._datasets["db1.schema1.table1"] = 1
extractor._datasets["db2.schema2.table2"] = 1
query_logs = wrap_query_log_stream_to_event(extractor.collect_query_logs())
expected = f"{test_root_dir}/redshift/query_logs.json"
assert query_logs == load_json(expected)
Loading