Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve query log extraction for Athena connector #1011

Merged
merged 5 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions metaphor/athena/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,23 @@ aws:

See [Output Config](../common/docs/output.md) for more information.

#### Query Log Extraction Configurations

The Athena connector will enable query log extraction by default

```yaml
query_log:
# (Optional) Number of days of query logs to fetch. Default to 1. If 0, the no query logs will be fetched.
lookback_days: <days>

# (Optional)
process_query_config: <process_query_config>
elic-eon marked this conversation as resolved.
Show resolved Hide resolved
```

##### Process Query Config

See [Process Query](../common/docs/process_query.md) for more information on the optional `process_query_config` config.

## Testing

Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv).
Expand Down
15 changes: 15 additions & 0 deletions metaphor/athena/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@
from metaphor.common.base_config import BaseConfig
from metaphor.common.dataclass import ConnectorConfig
from metaphor.common.filter import DatasetFilter
from metaphor.common.sql.process_query.config import ProcessQueryConfig


@dataclass(config=ConnectorConfig)
class QueryLogConfig:
# Number of days back of query logs to fetch, if 0, don't fetch query logs
lookback_days: int = 1

# Config to control query processing
process_query: ProcessQueryConfig = field(
default_factory=lambda: ProcessQueryConfig()
)


@dataclass(config=ConnectorConfig)
Expand All @@ -14,3 +26,6 @@ class AthenaRunConfig(BaseConfig):

# Include or exclude specific databases/schemas/tables
filter: DatasetFilter = field(default_factory=lambda: DatasetFilter())

# configs for fetching query logs
query_log: QueryLogConfig = field(default_factory=lambda: QueryLogConfig())
51 changes: 29 additions & 22 deletions metaphor/athena/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
from metaphor.common.entity_id import dataset_normalized_name
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.logger import get_logger, json_dump_to_debug_file
from metaphor.common.sql.query_log import PartialQueryLog, process_and_init_query_log
from metaphor.common.sql.table_level_lineage.table_level_lineage import (
extract_table_level_lineage,
)
from metaphor.common.utils import chunks, md5_digest, to_utc_time
from metaphor.common.utils import chunks, start_of_day, to_utc_time
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
DataPlatform,
Expand Down Expand Up @@ -56,6 +57,7 @@
self._datasets: Dict[str, Dataset] = {}
self._aws_config = config.aws
self._filter = config.filter.normalize()
self._query_log_config = config.query_log

async def extract(self) -> Collection[ENTITY_TYPES]:
logger.info("Fetching metadata from Athena")
Expand All @@ -78,9 +80,10 @@
return self._datasets.values()

def collect_query_logs(self) -> Iterator[QueryLog]:
for page in self._paginate_and_dump_response("list_query_executions"):
ids = page["QueryExecutionIds"]
yield from self._batch_get_queries(ids)
if self._query_log_config.lookback_days > 0:
for page in self._paginate_and_dump_response("list_query_executions"):
ids = page["QueryExecutionIds"]
yield from self._batch_get_queries(ids)

def _get_catalogs(self):
database_names = []
Expand Down Expand Up @@ -180,12 +183,10 @@

def _batch_get_queries(self, query_execution_ids: List[str]) -> List[QueryLog]:
query_logs: List[QueryLog] = []
lookback_start_time = start_of_day(self._query_log_config.lookback_days)

for ids in chunks(query_execution_ids, 50):
raw_response = self._client.batch_get_query_execution(QueryExecutionIds=ids)
request_id = raw_response["ResponseMetadata"]["RequestId"]
json_dump_to_debug_file(
raw_response, f"batch_get_query_execution_{request_id}.json"
)

response = BatchGetQueryExecutionResponse(**raw_response)
for unprocessed in response.UnprocessedQueryExecutionIds or []:
Expand All @@ -207,6 +208,16 @@
(context.Catalog, context.Database) if context else (None, None)
)

start_time = (
to_utc_time(query_execution.Status.SubmissionDateTime)
if query_execution.Status
and query_execution.Status.SubmissionDateTime
else None
)

if start_time and start_time < lookback_start_time:
continue

Check warning on line 219 in metaphor/athena/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/athena/extractor.py#L219

Added line #L219 was not covered by tests

tll = extract_table_level_lineage(
sql=query,
platform=DataPlatform.ATHENA,
Expand All @@ -215,28 +226,24 @@
default_schema=schema,
)

start_time = (
to_utc_time(query_execution.Status.SubmissionDateTime)
if query_execution.Status
and query_execution.Status.SubmissionDateTime
else None
)

query_logs.append(
QueryLog(
query_log = process_and_init_query_log(
query=query,
platform=DataPlatform.ATHENA,
process_query_config=self._query_log_config.process_query,
query_log=PartialQueryLog(
duration=(
query_execution.Statistics.TotalExecutionTimeInMillis
if query_execution.Statistics
else None
),
platform=DataPlatform.ATHENA,
query_id=query_execution.QueryExecutionId,
sources=tll.sources,
targets=tll.targets,
sql=query,
sql_hash=md5_digest(query.encode("utf-8")),
start_time=start_time,
)
),
query_id=query_execution.QueryExecutionId,
)

if query_log:
query_logs.append(query_log)

return query_logs
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.127"
version = "0.14.128"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
},
{
"QueryExecutionId": "8c944c49-ccc0-43b2-9dc9-e2428c76e8ab",
"Query": "SELECT * FROM \"spectrum_db2\".\"sales\" limit 10",
"Query": "SELECT * FROM \"spectrum_db2\".\"sales\" WHERE id = '00001' limit 10",
"StatementType": "DML",
"ResultConfiguration": {
"OutputLocation": "s3://metaphor-athena-output/athena/8c944c49-ccc0-43b2-9dc9-e2428c76e8ab.csv",
Expand Down
11 changes: 7 additions & 4 deletions tests/athena/expected_query_logs.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[
{
"_id": "ATHENA:a52ba71d-7cca-42e1-b4dc-db8c5c59e4ff",
"duration": 1360,
"platform": "ATHENA",
"queryId": "a52ba71d-7cca-42e1-b4dc-db8c5c59e4ff",
Expand All @@ -11,12 +12,13 @@
"table": "sales"
}
],
"sql": "SELECT * FROM \"spectrum_db2\".\"sales\" limit 10",
"sql": "SELECT * FROM \"spectrum_db2\".\"sales\" LIMIT '<REDACTED>'",
"sqlHash": "ab84f70dad2f22c7680b41d0dc8ca73d",
"startTime": "2024-10-03T00:50:12.646000+00:00",
"targets": []
},
{
"_id": "ATHENA:8c944c49-ccc0-43b2-9dc9-e2428c76e8ab",
"duration": 762,
"platform": "ATHENA",
"queryId": "8c944c49-ccc0-43b2-9dc9-e2428c76e8ab",
Expand All @@ -28,12 +30,13 @@
"table": "sales"
}
],
"sql": "SELECT * FROM \"spectrum_db2\".\"sales\" limit 10",
"sqlHash": "ab84f70dad2f22c7680b41d0dc8ca73d",
"sql": "SELECT * FROM \"spectrum_db2\".\"sales\" WHERE id = '<REDACTED>' LIMIT '<REDACTED>'",
"sqlHash": "459ebefd29191afbc3a59e313d5acdb8",
"startTime": "2024-10-02T16:48:48.443000+00:00",
"targets": []
},
{
"_id": "ATHENA:a80c3d38-5a82-450c-a7f3-58bc476597d8",
"duration": 547,
"platform": "ATHENA",
"queryId": "a80c3d38-5a82-450c-a7f3-58bc476597d8",
Expand All @@ -45,7 +48,7 @@
"table": "sales"
}
],
"sql": "-- View Example\nCREATE OR REPLACE VIEW sales_view AS\nSELECT salesid, listid, sellerid, buyerid, dateid, qtysold, pricepaid, commission, saletime\nFROM sales\nWHERE commission > 10",
"sql": "/* View Example */ CREATE OR REPLACE VIEW sales_view AS SELECT salesid, listid, sellerid, buyerid, dateid, qtysold, pricepaid, commission, saletime FROM sales WHERE commission > '<REDACTED>'",
"sqlHash": "f89a26b30eb0e4fac013ff31f4ea900e",
"startTime": "2024-10-03T00:50:38.339000+00:00",
"targets": [
Expand Down
13 changes: 12 additions & 1 deletion tests/athena/test_extractor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from unittest.mock import MagicMock, patch

import pytest
from freezegun import freeze_time

from metaphor.athena.config import AthenaRunConfig, AwsCredentials
from metaphor.athena.config import AthenaRunConfig, AwsCredentials, QueryLogConfig
from metaphor.athena.extractor import AthenaExtractor
from metaphor.common.base_config import OutputConfig
from metaphor.common.event_util import EventUtil
from metaphor.common.filter import DatasetFilter
from metaphor.common.sql.process_query.config import (
ProcessQueryConfig,
RedactPIILiteralsConfig,
)
from tests.test_utils import load_json


Expand All @@ -21,6 +26,11 @@ def dummy_config():
"awsdatacatalog": {"foo": None, "spectrum_db2": set(["table"])},
}
),
query_log=QueryLogConfig(
process_query=ProcessQueryConfig(
redact_literals=RedactPIILiteralsConfig(enabled=True)
)
),
output=OutputConfig(),
)

Expand Down Expand Up @@ -69,6 +79,7 @@ def mock_get_paginator(method: str):

@patch("metaphor.athena.extractor.create_athena_client")
@pytest.mark.asyncio
@freeze_time("2024-10-03")
async def test_collect_query_logs(mock_create_client: MagicMock, test_root_dir: str):
def mock_list_query_executions(**_):
yield load_json(
Expand Down
Loading