Skip to content

Commit

Permalink
feat(ingest/bigquery): Add query job retries for transient errors (da…
Browse files Browse the repository at this point in the history
…tahub-project#11162)

Co-authored-by: Gabe Lyons <[email protected]>
Co-authored-by: Gabe Lyons <[email protected]>
  • Loading branch information
3 people authored Aug 27, 2024
1 parent 414dc54 commit 56a563b
Showing 1 changed file with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from google.api_core import retry
from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
from google.cloud.bigquery import retry as bq_retry
from google.cloud.bigquery.table import (
RowIterator,
TableListItem,
Expand Down Expand Up @@ -155,8 +156,23 @@ def __init__(
self.datacatalog_client = datacatalog_client

def get_query_result(self, query: str) -> RowIterator:
def _should_retry(exc: BaseException) -> bool:
logger.debug(f"Exception occured for job query. Reason: {exc}")
# Jobs sometimes fail with transient errors.
# This is not currently handled by the python-bigquery client.
# https://github.com/googleapis/python-bigquery/issues/23
return "Retrying the job may solve the problem" in str(exc)

logger.debug(f"Query : {query}")
resp = self.bq_client.query(query)
resp = self.bq_client.query(
query,
job_retry=retry.Retry(
predicate=lambda exc: (
bq_retry.DEFAULT_JOB_RETRY._predicate(exc) or _should_retry(exc)
),
deadline=bq_retry.DEFAULT_JOB_RETRY._deadline,
),
)
return resp.result()

def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]:
Expand Down

0 comments on commit 56a563b

Please sign in to comment.