diff --git a/metaphor/athena/README.md b/metaphor/athena/README.md index 1e0caeb4..5902f046 100644 --- a/metaphor/athena/README.md +++ b/metaphor/athena/README.md @@ -54,6 +54,23 @@ aws: See [Output Config](../common/docs/output.md) for more information. +#### Query Log Extraction Configurations + +The Athena connector will enable query log extraction by default + +```yaml +query_log: + # (Optional) Number of days of query logs to fetch. Default to 1. If 0, the no query logs will be fetched. + lookback_days: + + # (Optional) + process_query: +``` + +##### Process Query Config + +See [Process Query](../common/docs/process_query.md) for more information on the optional `process_query_config` config. + ## Testing Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv). diff --git a/metaphor/athena/config.py b/metaphor/athena/config.py index 288541f9..57f13618 100644 --- a/metaphor/athena/config.py +++ b/metaphor/athena/config.py @@ -6,6 +6,18 @@ from metaphor.common.base_config import BaseConfig from metaphor.common.dataclass import ConnectorConfig from metaphor.common.filter import DatasetFilter +from metaphor.common.sql.process_query.config import ProcessQueryConfig + + +@dataclass(config=ConnectorConfig) +class QueryLogConfig: + # Number of days back of query logs to fetch, if 0, don't fetch query logs + lookback_days: int = 1 + + # Config to control query processing + process_query: ProcessQueryConfig = field( + default_factory=lambda: ProcessQueryConfig() + ) @dataclass(config=ConnectorConfig) @@ -14,3 +26,6 @@ class AthenaRunConfig(BaseConfig): # Include or exclude specific databases/schemas/tables filter: DatasetFilter = field(default_factory=lambda: DatasetFilter()) + + # configs for fetching query logs + query_log: QueryLogConfig = field(default_factory=lambda: QueryLogConfig()) diff --git a/metaphor/athena/extractor.py b/metaphor/athena/extractor.py index e7869e71..8d493f00 100644 --- a/metaphor/athena/extractor.py +++ b/metaphor/athena/extractor.py @@ -12,10 +12,11 @@ from metaphor.common.entity_id import dataset_normalized_name from metaphor.common.event_util import ENTITY_TYPES from metaphor.common.logger import get_logger, json_dump_to_debug_file +from metaphor.common.sql.query_log import PartialQueryLog, process_and_init_query_log from metaphor.common.sql.table_level_lineage.table_level_lineage import ( extract_table_level_lineage, ) -from metaphor.common.utils import chunks, md5_digest, to_utc_time +from metaphor.common.utils import chunks, start_of_day, to_utc_time from metaphor.models.crawler_run_metadata import Platform from metaphor.models.metadata_change_event import ( DataPlatform, @@ -56,6 +57,7 @@ def __init__(self, config: AthenaRunConfig) -> None: self._datasets: Dict[str, Dataset] = {} self._aws_config = config.aws self._filter = config.filter.normalize() + self._query_log_config = config.query_log async def extract(self) -> Collection[ENTITY_TYPES]: logger.info("Fetching metadata from Athena") @@ -78,9 +80,10 @@ async def extract(self) -> Collection[ENTITY_TYPES]: return self._datasets.values() def collect_query_logs(self) -> Iterator[QueryLog]: - for page in self._paginate_and_dump_response("list_query_executions"): - ids = page["QueryExecutionIds"] - yield from self._batch_get_queries(ids) + if self._query_log_config.lookback_days > 0: + for page in self._paginate_and_dump_response("list_query_executions"): + ids = page["QueryExecutionIds"] + yield from self._batch_get_queries(ids) def _get_catalogs(self): database_names = [] @@ -180,12 +183,10 @@ def _init_dataset(self, catalog: str, database: str, table_metadata: TableMetada def _batch_get_queries(self, query_execution_ids: List[str]) -> List[QueryLog]: query_logs: List[QueryLog] = [] + lookback_start_time = start_of_day(self._query_log_config.lookback_days) + for ids in chunks(query_execution_ids, 50): raw_response = self._client.batch_get_query_execution(QueryExecutionIds=ids) - request_id = raw_response["ResponseMetadata"]["RequestId"] - json_dump_to_debug_file( - raw_response, f"batch_get_query_execution_{request_id}.json" - ) response = BatchGetQueryExecutionResponse(**raw_response) for unprocessed in response.UnprocessedQueryExecutionIds or []: @@ -207,6 +208,16 @@ def _batch_get_queries(self, query_execution_ids: List[str]) -> List[QueryLog]: (context.Catalog, context.Database) if context else (None, None) ) + start_time = ( + to_utc_time(query_execution.Status.SubmissionDateTime) + if query_execution.Status + and query_execution.Status.SubmissionDateTime + else None + ) + + if start_time and start_time < lookback_start_time: + continue + tll = extract_table_level_lineage( sql=query, platform=DataPlatform.ATHENA, @@ -215,28 +226,24 @@ def _batch_get_queries(self, query_execution_ids: List[str]) -> List[QueryLog]: default_schema=schema, ) - start_time = ( - to_utc_time(query_execution.Status.SubmissionDateTime) - if query_execution.Status - and query_execution.Status.SubmissionDateTime - else None - ) - - query_logs.append( - QueryLog( + query_log = process_and_init_query_log( + query=query, + platform=DataPlatform.ATHENA, + process_query_config=self._query_log_config.process_query, + query_log=PartialQueryLog( duration=( query_execution.Statistics.TotalExecutionTimeInMillis if query_execution.Statistics else None ), - platform=DataPlatform.ATHENA, - query_id=query_execution.QueryExecutionId, sources=tll.sources, targets=tll.targets, - sql=query, - sql_hash=md5_digest(query.encode("utf-8")), start_time=start_time, - ) + ), + query_id=query_execution.QueryExecutionId, ) + if query_log: + query_logs.append(query_log) + return query_logs diff --git a/pyproject.toml b/pyproject.toml index df4b2bd7..62c46348 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.127" +version = "0.14.128" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/athena/data/batch_get_query_executions_ef77e890-ca4f-4e46-b4d3-ee90915237ae.json b/tests/athena/data/batch_get_query_executions_ef77e890-ca4f-4e46-b4d3-ee90915237ae.json index ffd8acd7..def95a18 100644 --- a/tests/athena/data/batch_get_query_executions_ef77e890-ca4f-4e46-b4d3-ee90915237ae.json +++ b/tests/athena/data/batch_get_query_executions_ef77e890-ca4f-4e46-b4d3-ee90915237ae.json @@ -45,7 +45,7 @@ }, { "QueryExecutionId": "8c944c49-ccc0-43b2-9dc9-e2428c76e8ab", - "Query": "SELECT * FROM \"spectrum_db2\".\"sales\" limit 10", + "Query": "SELECT * FROM \"spectrum_db2\".\"sales\" WHERE id = '00001' limit 10", "StatementType": "DML", "ResultConfiguration": { "OutputLocation": "s3://metaphor-athena-output/athena/8c944c49-ccc0-43b2-9dc9-e2428c76e8ab.csv", diff --git a/tests/athena/expected_query_logs.json b/tests/athena/expected_query_logs.json index 4821ce00..d797f4eb 100644 --- a/tests/athena/expected_query_logs.json +++ b/tests/athena/expected_query_logs.json @@ -1,5 +1,6 @@ [ { + "_id": "ATHENA:a52ba71d-7cca-42e1-b4dc-db8c5c59e4ff", "duration": 1360, "platform": "ATHENA", "queryId": "a52ba71d-7cca-42e1-b4dc-db8c5c59e4ff", @@ -11,12 +12,13 @@ "table": "sales" } ], - "sql": "SELECT * FROM \"spectrum_db2\".\"sales\" limit 10", + "sql": "SELECT * FROM \"spectrum_db2\".\"sales\" LIMIT ''", "sqlHash": "ab84f70dad2f22c7680b41d0dc8ca73d", "startTime": "2024-10-03T00:50:12.646000+00:00", "targets": [] }, { + "_id": "ATHENA:8c944c49-ccc0-43b2-9dc9-e2428c76e8ab", "duration": 762, "platform": "ATHENA", "queryId": "8c944c49-ccc0-43b2-9dc9-e2428c76e8ab", @@ -28,12 +30,13 @@ "table": "sales" } ], - "sql": "SELECT * FROM \"spectrum_db2\".\"sales\" limit 10", - "sqlHash": "ab84f70dad2f22c7680b41d0dc8ca73d", + "sql": "SELECT * FROM \"spectrum_db2\".\"sales\" WHERE id = '' LIMIT ''", + "sqlHash": "459ebefd29191afbc3a59e313d5acdb8", "startTime": "2024-10-02T16:48:48.443000+00:00", "targets": [] }, { + "_id": "ATHENA:a80c3d38-5a82-450c-a7f3-58bc476597d8", "duration": 547, "platform": "ATHENA", "queryId": "a80c3d38-5a82-450c-a7f3-58bc476597d8", @@ -45,7 +48,7 @@ "table": "sales" } ], - "sql": "-- View Example\nCREATE OR REPLACE VIEW sales_view AS\nSELECT salesid, listid, sellerid, buyerid, dateid, qtysold, pricepaid, commission, saletime\nFROM sales\nWHERE commission > 10", + "sql": "/* View Example */ CREATE OR REPLACE VIEW sales_view AS SELECT salesid, listid, sellerid, buyerid, dateid, qtysold, pricepaid, commission, saletime FROM sales WHERE commission > ''", "sqlHash": "f89a26b30eb0e4fac013ff31f4ea900e", "startTime": "2024-10-03T00:50:38.339000+00:00", "targets": [ diff --git a/tests/athena/test_extractor.py b/tests/athena/test_extractor.py index 9f49a107..1c46cb1c 100644 --- a/tests/athena/test_extractor.py +++ b/tests/athena/test_extractor.py @@ -1,12 +1,17 @@ from unittest.mock import MagicMock, patch import pytest +from freezegun import freeze_time -from metaphor.athena.config import AthenaRunConfig, AwsCredentials +from metaphor.athena.config import AthenaRunConfig, AwsCredentials, QueryLogConfig from metaphor.athena.extractor import AthenaExtractor from metaphor.common.base_config import OutputConfig from metaphor.common.event_util import EventUtil from metaphor.common.filter import DatasetFilter +from metaphor.common.sql.process_query.config import ( + ProcessQueryConfig, + RedactPIILiteralsConfig, +) from tests.test_utils import load_json @@ -21,6 +26,11 @@ def dummy_config(): "awsdatacatalog": {"foo": None, "spectrum_db2": set(["table"])}, } ), + query_log=QueryLogConfig( + process_query=ProcessQueryConfig( + redact_literals=RedactPIILiteralsConfig(enabled=True) + ) + ), output=OutputConfig(), ) @@ -69,6 +79,7 @@ def mock_get_paginator(method: str): @patch("metaphor.athena.extractor.create_athena_client") @pytest.mark.asyncio +@freeze_time("2024-10-03") async def test_collect_query_logs(mock_create_client: MagicMock, test_root_dir: str): def mock_list_query_executions(**_): yield load_json(