diff --git a/metaphor/athena/README.md b/metaphor/athena/README.md index 5902f046..76d012cf 100644 --- a/metaphor/athena/README.md +++ b/metaphor/athena/README.md @@ -63,6 +63,11 @@ query_log: # (Optional) Number of days of query logs to fetch. Default to 1. If 0, the no query logs will be fetched. lookback_days: + # (Optional) WorkGroups to collect query history, default to []. If not specify, collect from the primary workgroup + work_groups: + - workgroup_1 + - ... + # (Optional) process_query: ``` diff --git a/metaphor/athena/config.py b/metaphor/athena/config.py index 57f13618..32bd2a2d 100644 --- a/metaphor/athena/config.py +++ b/metaphor/athena/config.py @@ -1,4 +1,5 @@ from dataclasses import field +from typing import List from pydantic.dataclasses import dataclass @@ -14,6 +15,9 @@ class QueryLogConfig: # Number of days back of query logs to fetch, if 0, don't fetch query logs lookback_days: int = 1 + # (Optional) WorkGroups to collect query history, default to []. If not specify, collect from the primary workgroup + work_groups: List[str] = field(default_factory=list) + # Config to control query processing process_query: ProcessQueryConfig = field( default_factory=lambda: ProcessQueryConfig() diff --git a/metaphor/athena/extractor.py b/metaphor/athena/extractor.py index 8d493f00..8fd82246 100644 --- a/metaphor/athena/extractor.py +++ b/metaphor/athena/extractor.py @@ -80,10 +80,20 @@ async def extract(self) -> Collection[ENTITY_TYPES]: return self._datasets.values() def collect_query_logs(self) -> Iterator[QueryLog]: + # If empty, don't call list_query_executions with WorkGroup + work_groups = self._query_log_config.work_groups or [""] + if self._query_log_config.lookback_days > 0: - for page in self._paginate_and_dump_response("list_query_executions"): - ids = page["QueryExecutionIds"] - yield from self._batch_get_queries(ids) + for workgroup in work_groups: + params = {} + if workgroup: + params["WorkGroup"] = workgroup + + for page in self._paginate_and_dump_response( + "list_query_executions", **params + ): + ids = page["QueryExecutionIds"] + yield from self._batch_get_queries(ids) def _get_catalogs(self): database_names = [] diff --git a/pyproject.toml b/pyproject.toml index fd2793db..497665e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.129" +version = "0.14.130" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "]