From f1e25f21f4c0b3d39d8706e0d1226b0f864f46fe Mon Sep 17 00:00:00 2001 From: Scott Huang Date: Thu, 22 Aug 2024 15:27:50 +0800 Subject: [PATCH 1/3] Use sys_* system view for query log --- metaphor/oracle/extractor.py | 6 +-- metaphor/redshift/README.md | 7 +++- metaphor/redshift/access_event.py | 63 ++++++++++++------------------- metaphor/redshift/extractor.py | 47 +++++++++++++++++------ tests/redshift/query_logs.json | 10 +++-- tests/redshift/test_extractor.py | 30 ++++++--------- 6 files changed, 87 insertions(+), 76 deletions(-) diff --git a/metaphor/oracle/extractor.py b/metaphor/oracle/extractor.py index 6f771bcb..dc4d4e6d 100644 --- a/metaphor/oracle/extractor.py +++ b/metaphor/oracle/extractor.py @@ -198,7 +198,7 @@ def _inner_fetch_query_logs( schema = user.lower() if user else None database = self._database if self._database else None - ttl = extract_table_level_lineage( + tll = extract_table_level_lineage( query, platform=DataPlatform.ORACLE, account=self._alternative_host or self._config.host, @@ -221,9 +221,9 @@ def _inner_fetch_query_logs( start_time=to_utc_time(start), bytes_read=safe_float(read_bytes), bytes_written=safe_float(write_bytes), - sources=ttl.sources, + sources=tll.sources, rows_read=safe_float(row_count), - targets=ttl.targets, + targets=tll.targets, ) ) return logs diff --git a/metaphor/redshift/README.md b/metaphor/redshift/README.md index 440746c6..fabd4e8e 100644 --- a/metaphor/redshift/README.md +++ b/metaphor/redshift/README.md @@ -12,11 +12,14 @@ Use the following command to grant the permission: # Create a new user called "metaphor" CREATE USER metaphor PASSWORD ; -# Grant minimally required privleages to the user +# Grant minimally required privileges to the user GRANT SELECT ON pg_catalog.svv_table_info TO metaphor; -# Grant access to syslog "STL_SCAN" and "STL_QUERY" +# Grant access to syslog "SYS_*" ALTER USER metaphor WITH SYSLOG ACCESS UNRESTRICTED; + +# Grant access to "PG_USER_INFO" +GRANT SELECT ON pg_catalog.pg_user_info TO metaphor; ``` > Note: If the Redshift cluster contains more than one database, you must grant the permission in all databases. Alternatively, you can limit the connector to a subset of databases using the [filter configuration](../common/docs/filter.md). diff --git a/metaphor/redshift/access_event.py b/metaphor/redshift/access_event.py index 8aca813f..76fdb490 100644 --- a/metaphor/redshift/access_event.py +++ b/metaphor/redshift/access_event.py @@ -8,12 +8,12 @@ WITH queries AS ( SELECT *, - SUM(LEN(text)) OVER (PARTITION BY query) AS length + SUM(LEN(text)) OVER (PARTITION BY query_id) AS length FROM - stl_querytext + sys_query_text ), filtered AS ( SELECT - q.query, + q.query_id, LISTAGG( CASE WHEN LEN(RTRIM(q.text)) = 0 THEN q.text @@ -26,31 +26,25 @@ WHERE q.length < 65536 GROUP BY - q.query + q.query_id ) SELECT DISTINCT - ss.userid, - ss.query, - sui.usename, - ss.rows, - ss.bytes, - ss.tbl, + sqh.user_id, + sqh.query_id, + pu.usename, + sqh.returned_rows AS rows, + sqh.returned_bytes AS bytes, q.querytxt, - sti.database, - sti.schema, - sti.table, - sq.starttime, - sq.endtime, - sq.aborted -FROM stl_scan ss - JOIN svv_table_info sti ON ss.tbl = sti.table_id - JOIN stl_query sq ON ss.query = sq.query - JOIN filtered q ON ss.query = q.query - JOIN svl_user_info sui ON sq.userid = sui.usesysid -WHERE ss.starttime >= '{start_time}' - AND ss.starttime < '{end_time}' - AND sq.aborted = 0 -ORDER BY ss.endtime DESC; + trim(sqh.database_name) AS database, + sqh.start_time, + sqh.end_time +FROM sys_query_history sqh + JOIN filtered q ON sqh.query_id = q.query_id + JOIN pg_user_info pu ON sqh.user_id = pu.usesysid +WHERE sqh.status = 'success' + AND sqh.start_time >= '{start_time}' + AND sqh.start_time < '{end_time}' +ORDER BY sqh.end_time DESC; """ """ The condition `length < 65536` is because Redshift's LISTAGG method @@ -61,19 +55,15 @@ @dataclass(frozen=True) class AccessEvent: - userid: int - query: int + user_id: int + query_id: int usename: str - tbl: int rows: int bytes: int querytxt: str database: str - schema: str - table: str - starttime: datetime - endtime: datetime - aborted: int + end_time: datetime + start_time: datetime @staticmethod def from_record(record: Record) -> "AccessEvent": @@ -87,16 +77,13 @@ def from_record(record: Record) -> "AccessEvent": if isinstance(v, str): record[k] = v.strip() - record["starttime"] = record["starttime"].replace(tzinfo=timezone.utc) - record["endtime"] = record["endtime"].replace(tzinfo=timezone.utc) + record["start_time"] = record["start_time"].replace(tzinfo=timezone.utc) + record["end_time"] = record["end_time"].replace(tzinfo=timezone.utc) record["querytxt"] = record["querytxt"].replace("\\n", "\n") return AccessEvent(**record) - def table_name(self) -> str: - return f"{self.database}.{self.schema}.{self.table}" - @staticmethod async def fetch_access_event( conn: Connection, diff --git a/metaphor/redshift/extractor.py b/metaphor/redshift/extractor.py index 4a95b89d..a57b5423 100644 --- a/metaphor/redshift/extractor.py +++ b/metaphor/redshift/extractor.py @@ -7,6 +7,9 @@ from metaphor.common.event_util import ENTITY_TYPES from metaphor.common.logger import get_logger from metaphor.common.models import to_dataset_statistics +from metaphor.common.sql.table_level_lineage.table_level_lineage import ( + extract_table_level_lineage, +) from metaphor.common.tag_matcher import tag_datasets from metaphor.common.utils import md5_digest, start_of_day from metaphor.models.crawler_run_metadata import Platform @@ -34,7 +37,11 @@ def __init__(self, config: RedshiftRunConfig): super().__init__(config) self._tag_matchers = config.tag_matchers self._query_log_lookback_days = config.query_log.lookback_days + + # Exclude metaphor user self._query_log_excluded_usernames = config.query_log.excluded_usernames + self._query_log_excluded_usernames.add(config.user) + self._filter = exclude_system_databases(self._filter) self._logs: List[QueryLog] = [] @@ -138,29 +145,47 @@ async def _fetch_query_logs(self, conn): if query_log: yield query_log - def _process_record(self, access_event: AccessEvent): - if not self._filter.include_table( - access_event.database, access_event.schema, access_event.table - ): - return + def _is_related_query_log(self, queried_datasets: List[QueriedDataset]) -> bool: + for dataset in queried_datasets: + table_name = dataset_normalized_name( + db=dataset.database, schema=dataset.schema, table=dataset.table + ) + + if table_name in self._datasets: + return True + return False + def _process_record(self, access_event: AccessEvent): if access_event.usename in self._query_log_excluded_usernames: return - sources = [self._convert_resource_to_queried_dataset(access_event)] + tll = extract_table_level_lineage( + sql=access_event.querytxt, + platform=DataPlatform.REDSHIFT, + account=None, + query_id=str(access_event.query_id), + default_database=access_event.database, + ) + + if not ( + self._is_related_query_log(tll.sources) + or self._is_related_query_log(tll.targets) + ): + return query_log = QueryLog( - id=f"{DataPlatform.REDSHIFT.name}:{access_event.query}", - query_id=str(access_event.query), + id=f"{DataPlatform.REDSHIFT.name}:{access_event.query_id}", + query_id=str(access_event.query_id), platform=DataPlatform.REDSHIFT, - start_time=access_event.starttime, + start_time=access_event.start_time, duration=float( - (access_event.endtime - access_event.starttime).total_seconds() + (access_event.end_time - access_event.start_time).total_seconds() ), user_id=access_event.usename, rows_read=float(access_event.rows), bytes_read=float(access_event.bytes), - sources=sources, + sources=tll.sources, + targets=tll.targets, sql=access_event.querytxt, sql_hash=md5_digest(access_event.querytxt.encode("utf-8")), ) diff --git a/tests/redshift/query_logs.json b/tests/redshift/query_logs.json index 72e72de4..c52aae11 100644 --- a/tests/redshift/query_logs.json +++ b/tests/redshift/query_logs.json @@ -17,8 +17,9 @@ "table": "table1" } ], - "sql": "UaEktANLgExavLlDcKmu", - "sqlHash": "b871153e9d0d386ce0355159f196f0ad", + "targets": [], + "sql": "select * from schema1.table1", + "sqlHash": "fbbd533367fe766d71a48425c5c48654", "startTime": "2010-06-26T10:07:18.014673", "userId": "aZUytYDMFTryuKyWtbEM" }, @@ -37,8 +38,9 @@ "table": "table2" } ], - "sql": "qUKVJPZNJEmeMNSgnVkF", - "sqlHash": "2d1aea13de8a75a410b37c223246810f", + "targets": [], + "sql": "select * from schema2.table2", + "sqlHash": "9f98db863e022a1b0c8b0895c1d4dec1", "startTime": "2003-10-01T10:15:51.904151", "userId": "IevfvBUzEVUDrTbaIWKY" } diff --git a/tests/redshift/test_extractor.py b/tests/redshift/test_extractor.py index b9a9806c..a448b5ad 100644 --- a/tests/redshift/test_extractor.py +++ b/tests/redshift/test_extractor.py @@ -47,34 +47,26 @@ def test_collect_query_logs(test_root_dir: str) -> None: # Random stuff generated with polyfactory access_events = [ AccessEvent( - userid=7610, - query=7086, + user_id=7610, + query_id=7086, usename="aZUytYDMFTryuKyWtbEM", - tbl=9923, rows=7155, bytes=6500, - querytxt="UaEktANLgExavLlDcKmu", + querytxt="select * from schema1.table1", database="db1", - schema="schema1", - table="table1", - starttime=datetime.datetime(2010, 6, 26, 10, 7, 18, 14673), - endtime=datetime.datetime(1999, 10, 8, 12, 25, 39, 706365), - aborted=3583, + start_time=datetime.datetime(2010, 6, 26, 10, 7, 18, 14673), + end_time=datetime.datetime(1999, 10, 8, 12, 25, 39, 706365), ), AccessEvent( - userid=8902, - query=9910, + user_id=8902, + query_id=9910, usename="IevfvBUzEVUDrTbaIWKY", - tbl=8494, rows=4617, bytes=176, - querytxt="qUKVJPZNJEmeMNSgnVkF", + querytxt="select * from schema2.table2", database="db2", - schema="schema2", - table="table2", - starttime=datetime.datetime(2003, 10, 1, 10, 15, 51, 904151), - endtime=datetime.datetime(1996, 11, 14, 17, 6, 9, 676224), - aborted=5146, + start_time=datetime.datetime(2003, 10, 1, 10, 15, 51, 904151), + end_time=datetime.datetime(1996, 11, 14, 17, 6, 9, 676224), ), ] @@ -103,6 +95,8 @@ async def __anext__(self): extractor = RedshiftExtractor(dummy_config()) extractor._included_databases = included_dbs + extractor._datasets["db1.schema1.table1"] = 1 + extractor._datasets["db2.schema2.table2"] = 1 query_logs = wrap_query_log_stream_to_event(extractor.collect_query_logs()) expected = f"{test_root_dir}/redshift/query_logs.json" assert query_logs == load_json(expected) From ce35f897bdd73653214828acc988b2286f37aa2b Mon Sep 17 00:00:00 2001 From: Scott Huang Date: Thu, 22 Aug 2024 15:28:15 +0800 Subject: [PATCH 2/3] Bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2aebcffc..81d13fcf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.82" +version = "0.14.83" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] From 7801ac4b0c7a59e624a2357fadcc168886374a64 Mon Sep 17 00:00:00 2001 From: Scott Huang Date: Thu, 22 Aug 2024 15:40:08 +0800 Subject: [PATCH 3/3] Drop unused code --- metaphor/redshift/extractor.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/metaphor/redshift/extractor.py b/metaphor/redshift/extractor.py index a57b5423..d89b84b7 100644 --- a/metaphor/redshift/extractor.py +++ b/metaphor/redshift/extractor.py @@ -3,7 +3,7 @@ from typing import Collection, Iterator, List, Set from metaphor.common.constants import BYTES_PER_MEGABYTES -from metaphor.common.entity_id import dataset_normalized_name, to_dataset_entity_id +from metaphor.common.entity_id import dataset_normalized_name from metaphor.common.event_util import ENTITY_TYPES from metaphor.common.logger import get_logger from metaphor.common.models import to_dataset_statistics @@ -191,16 +191,3 @@ def _process_record(self, access_event: AccessEvent): ) return query_log - - @staticmethod - def _convert_resource_to_queried_dataset(event: AccessEvent) -> QueriedDataset: - dataset_name = dataset_normalized_name( - event.database, event.schema, event.table - ) - dataset_id = str(to_dataset_entity_id(dataset_name, DataPlatform.REDSHIFT)) - return QueriedDataset( - id=dataset_id, - database=event.database, - schema=event.schema, - table=event.table, - )