diff --git a/metaphor/bigquery/README.md b/metaphor/bigquery/README.md index a5e21d03..b22a0033 100644 --- a/metaphor/bigquery/README.md +++ b/metaphor/bigquery/README.md @@ -155,6 +155,9 @@ query_log: # (Optional) Fetch the full query SQL from job API if it's truncated in the audit metadata log, default True. fetch_job_query_if_truncated: + + # Maximum allowed requests per minute to the log entries API, default to 60 + max_requests_per_minute: ``` ##### Process Query Config diff --git a/metaphor/bigquery/config.py b/metaphor/bigquery/config.py index e0216d3b..21e6109a 100644 --- a/metaphor/bigquery/config.py +++ b/metaphor/bigquery/config.py @@ -15,6 +15,9 @@ # See https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list DEFAULT_QUERY_LOG_FETCH_SIZE = 1000 +# See https://cloud.google.com/logging/quotas +DEFAULT_MAX_REQUESTS_PER_MINUTE = 59 + @dataclass(config=ConnectorConfig) class BigQueryCredentials: @@ -62,6 +65,9 @@ class BigQueryQueryLogConfig: default_factory=lambda: ProcessQueryConfig() ) + # Maximum allowed requests per minute to the log entries API + max_requests_per_minute: int = DEFAULT_MAX_REQUESTS_PER_MINUTE + @dataclass(config=ConnectorConfig) class BigQueryRunConfig(BaseConfig): diff --git a/metaphor/bigquery/extractor.py b/metaphor/bigquery/extractor.py index 21786a00..1b3642c1 100644 --- a/metaphor/bigquery/extractor.py +++ b/metaphor/bigquery/extractor.py @@ -1,4 +1,5 @@ import re +import time from concurrent.futures import ThreadPoolExecutor from datetime import datetime from typing import ( @@ -87,6 +88,7 @@ def __init__(self, config: BigQueryRunConfig) -> None: self._fetch_job_query_if_truncated = ( config.query_log.fetch_job_query_if_truncated ) + self._max_requests_per_minute = config.query_log.max_requests_per_minute self._datasets: List[Dataset] = [] @@ -306,15 +308,29 @@ def _fetch_query_logs(self, project_id: str) -> Iterator[QueryLog]: client = build_client(project_id, self._credentials) logging_client = build_logging_client(project_id, self._credentials) fetched_logs = 0 + + count = 0 + last_time = time.time() + for entry in logging_client.list_entries( page_size=self._query_log_fetch_size, filter_=log_filter ): + count += 1 + if JobChangeEvent.can_parse(entry): log = self._parse_job_change_entry(entry, client) if log: fetched_logs += 1 yield log + if count % self._query_log_fetch_size == 0: + current_time = time.time() + elapsed_time = current_time - last_time + wait_time = (60 / self._max_requests_per_minute) - elapsed_time + last_time = current_time + if wait_time > 0: + time.sleep(wait_time) + logger.info(f"Number of audit log entries fetched: {fetched_logs}") @staticmethod diff --git a/pyproject.toml b/pyproject.toml index 2afca9cf..a5706d81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.146" +version = "0.14.147" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "]