From 66e4732bd3c7f84b7618f8dd1f0ebb9a6a445ace Mon Sep 17 00:00:00 2001 From: zee2theodd <67772592+zee2theodd@users.noreply.github.com> Date: Mon, 28 Oct 2024 14:59:01 -0400 Subject: [PATCH] Merged bigquery.py from Spotify's Luigi repository --- luigi/contrib/bigquery.py | 79 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 75 insertions(+), 4 deletions(-) diff --git a/luigi/contrib/bigquery.py b/luigi/contrib/bigquery.py index 55fc3b72c8..5d93efdaed 100644 --- a/luigi/contrib/bigquery.py +++ b/luigi/contrib/bigquery.py @@ -21,14 +21,38 @@ import time from luigi.contrib import gcp +from tenacity import retry +from tenacity import retry_if_exception +from tenacity import retry_if_exception_type +from tenacity import wait_exponential +from tenacity import stop_after_attempt + logger = logging.getLogger('luigi-interface') +RETRYABLE_ERRORS = None try: + import httplib2 from googleapiclient import discovery + from googleapiclient import errors from googleapiclient import http except ImportError: logger.warning('BigQuery module imported, but google-api-python-client is ' 'not installed. Any BigQuery task will fail') +else: + RETRYABLE_ERRORS = (httplib2.HttpLib2Error, IOError, TimeoutError, BrokenPipeError) + + +# Retry configurations. For more details, see https://tenacity.readthedocs.io/en/latest/ +def is_error_5xx(err): + return isinstance(err, errors.HttpError) and err.resp.status >= 500 + + +bq_retry = retry(retry=(retry_if_exception(is_error_5xx) | retry_if_exception_type(RETRYABLE_ERRORS)), + wait=wait_exponential(multiplier=1, min=1, max=10), + stop=stop_after_attempt(3), + reraise=True, + after=lambda x: x.args[0]._initialise_client() + ) class CreateDisposition: @@ -52,6 +76,7 @@ class SourceFormat: CSV = 'CSV' DATASTORE_BACKUP = 'DATASTORE_BACKUP' NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' + PARQUET = 'PARQUET' class FieldDelimiter: @@ -121,13 +146,23 @@ class BigQueryClient: """ def __init__(self, oauth_credentials=None, descriptor='', http_=None): - authenticate_kwargs = gcp.get_authenticate_kwargs(oauth_credentials, http_) + # Save initialisation arguments in case we need to re-create client + # due to connection timeout + self.oauth_credentials = oauth_credentials + self.descriptor = descriptor + self.http_ = http_ + + self._initialise_client() - if descriptor: - self.client = discovery.build_from_document(descriptor, **authenticate_kwargs) + def _initialise_client(self): + authenticate_kwargs = gcp.get_authenticate_kwargs(self.oauth_credentials, self.http_) + + if self.descriptor: + self.client = discovery.build_from_document(self.descriptor, **authenticate_kwargs) else: self.client = discovery.build('bigquery', 'v2', cache_discovery=False, **authenticate_kwargs) + @bq_retry def dataset_exists(self, dataset): """Returns whether the given dataset exists. If regional location is specified for the dataset, that is also checked @@ -146,7 +181,6 @@ def dataset_exists(self, dataset): raise Exception('''Dataset already exists with regional location {}. Can't use {}.'''.format( fetched_location if fetched_location is not None else 'unspecified', dataset.location)) - except http.HttpError as ex: if ex.resp.status == 404: return False @@ -154,6 +188,7 @@ def dataset_exists(self, dataset): return True + @bq_retry def table_exists(self, table): """Returns whether the given table exists. @@ -527,6 +562,16 @@ def allow_quoted_new_lines(self): """ Indicates if BigQuery should allow quoted data sections that contain newline characters in a CSV file. The default value is false.""" return False + def configure_job(self, configuration): + """Set additional job configuration. + + This allows to specify job configuration parameters that are not exposed via Task properties. + + :param configuration: Current configuration. + :return: New or updated configuration. + """ + return configuration + def run(self): output = self.output() assert isinstance(output, BigQueryTarget), 'Output must be a BigQueryTarget, not %s' % (output) @@ -565,6 +610,8 @@ def run(self): else: job['configuration']['load']['autodetect'] = True + job['configuration'] = self.configure_job(job['configuration']) + bq_client.run_job(output.table.project_id, job, dataset=output.table.dataset) @@ -610,6 +657,16 @@ def use_legacy_sql(self): """ return True + def configure_job(self, configuration): + """Set additional job configuration. + + This allows to specify job configuration parameters that are not exposed via Task properties. + + :param configuration: Current configuration. + :return: New or updated configuration. + """ + return configuration + def run(self): output = self.output() assert isinstance(output, BigQueryTarget), 'Output must be a BigQueryTarget, not %s' % (output) @@ -643,6 +700,8 @@ def run(self): } } + job['configuration'] = self.configure_job(job['configuration']) + bq_client.run_job(output.table.project_id, job, dataset=output.table.dataset) @@ -739,6 +798,16 @@ def compression(self): """Whether to use compression.""" return Compression.NONE + def configure_job(self, configuration): + """Set additional job configuration. + + This allows to specify job configuration parameters that are not exposed via Task properties. + + :param configuration: Current configuration. + :return: New or updated configuration. + """ + return configuration + def run(self): input = luigi.task.flatten(self.input())[0] assert ( @@ -775,6 +844,8 @@ def run(self): job['configuration']['extract']['fieldDelimiter'] = \ self.field_delimiter + job['configuration'] = self.configure_job(job['configuration']) + bq_client.run_job( input.table.project_id, job,