Skip to content

Commit

Permalink
Add rate limit config for BigQuery crawler (#1032)
Browse files Browse the repository at this point in the history
* throttle the API call fetching BQ log entries

* Bump version

* Update readme

* refine

* refine

* Update metaphor/bigquery/config.py

Co-authored-by: Tsung-Ju Lii <[email protected]>

* address reviewer comment

* Change default value

* Add doc

* Bump version

---------

Co-authored-by: Tsung-Ju Lii <[email protected]>
  • Loading branch information
elic-eon and usefulalgorithm authored Nov 5, 2024
1 parent c4a07e9 commit 1f045f2
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
3 changes: 3 additions & 0 deletions metaphor/bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ query_log:
# (Optional) Fetch the full query SQL from job API if it's truncated in the audit metadata log, default True.
fetch_job_query_if_truncated: <boolean>
# Maximum allowed requests per minute to the log entries API, default to 60
max_requests_per_minute: <requests_per_minute>
```

##### Process Query Config
Expand Down
6 changes: 6 additions & 0 deletions metaphor/bigquery/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
# See https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list
DEFAULT_QUERY_LOG_FETCH_SIZE = 1000

# See https://cloud.google.com/logging/quotas
DEFAULT_MAX_REQUESTS_PER_MINUTE = 59


@dataclass(config=ConnectorConfig)
class BigQueryCredentials:
Expand Down Expand Up @@ -62,6 +65,9 @@ class BigQueryQueryLogConfig:
default_factory=lambda: ProcessQueryConfig()
)

# Maximum allowed requests per minute to the log entries API
max_requests_per_minute: int = DEFAULT_MAX_REQUESTS_PER_MINUTE


@dataclass(config=ConnectorConfig)
class BigQueryRunConfig(BaseConfig):
Expand Down
16 changes: 16 additions & 0 deletions metaphor/bigquery/extractor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import (
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(self, config: BigQueryRunConfig) -> None:
self._fetch_job_query_if_truncated = (
config.query_log.fetch_job_query_if_truncated
)
self._max_requests_per_minute = config.query_log.max_requests_per_minute

self._datasets: List[Dataset] = []

Expand Down Expand Up @@ -306,15 +308,29 @@ def _fetch_query_logs(self, project_id: str) -> Iterator[QueryLog]:
client = build_client(project_id, self._credentials)
logging_client = build_logging_client(project_id, self._credentials)
fetched_logs = 0

count = 0
last_time = time.time()

for entry in logging_client.list_entries(
page_size=self._query_log_fetch_size, filter_=log_filter
):
count += 1

if JobChangeEvent.can_parse(entry):
log = self._parse_job_change_entry(entry, client)
if log:
fetched_logs += 1
yield log

if count % self._query_log_fetch_size == 0:
current_time = time.time()
elapsed_time = current_time - last_time
wait_time = (60 / self._max_requests_per_minute) - elapsed_time
last_time = current_time
if wait_time > 0:
time.sleep(wait_time)

logger.info(f"Number of audit log entries fetched: {fetched_logs}")

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.146"
version = "0.14.147"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down

0 comments on commit 1f045f2

Please sign in to comment.