Skip to content

Commit

Permalink
Allow to config workgroup to collection query log from (#1013)
Browse files Browse the repository at this point in the history
* Add work_groups config

* Bump version
  • Loading branch information
elic-eon authored Oct 17, 2024
1 parent 6417920 commit c9f1c92
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 4 deletions.
5 changes: 5 additions & 0 deletions metaphor/athena/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ query_log:
# (Optional) Number of days of query logs to fetch. Default to 1. If 0, the no query logs will be fetched.
lookback_days: <days>

# (Optional) WorkGroups to collect query history, default to []. If not specify, collect from the primary workgroup
work_groups:
- workgroup_1
- ...

# (Optional)
process_query: <process_query_config>
```
Expand Down
4 changes: 4 additions & 0 deletions metaphor/athena/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import field
from typing import List

from pydantic.dataclasses import dataclass

Expand All @@ -14,6 +15,9 @@ class QueryLogConfig:
# Number of days back of query logs to fetch, if 0, don't fetch query logs
lookback_days: int = 1

# (Optional) WorkGroups to collect query history, default to []. If not specify, collect from the primary workgroup
work_groups: List[str] = field(default_factory=list)

# Config to control query processing
process_query: ProcessQueryConfig = field(
default_factory=lambda: ProcessQueryConfig()
Expand Down
16 changes: 13 additions & 3 deletions metaphor/athena/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,20 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
return self._datasets.values()

def collect_query_logs(self) -> Iterator[QueryLog]:
# If empty, don't call list_query_executions with WorkGroup
work_groups = self._query_log_config.work_groups or [""]

if self._query_log_config.lookback_days > 0:
for page in self._paginate_and_dump_response("list_query_executions"):
ids = page["QueryExecutionIds"]
yield from self._batch_get_queries(ids)
for workgroup in work_groups:
params = {}
if workgroup:
params["WorkGroup"] = workgroup

for page in self._paginate_and_dump_response(
"list_query_executions", **params
):
ids = page["QueryExecutionIds"]
yield from self._batch_get_queries(ids)

def _get_catalogs(self):
database_names = []
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.129"
version = "0.14.130"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down

0 comments on commit c9f1c92

Please sign in to comment.