Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged bigquery.py from Spotify's Luigi repository #3319

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 75 additions & 4 deletions luigi/contrib/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,38 @@
import time
from luigi.contrib import gcp

from tenacity import retry
from tenacity import retry_if_exception
from tenacity import retry_if_exception_type
from tenacity import wait_exponential
from tenacity import stop_after_attempt

logger = logging.getLogger('luigi-interface')

RETRYABLE_ERRORS = None
try:
import httplib2
from googleapiclient import discovery
from googleapiclient import errors
from googleapiclient import http
except ImportError:
logger.warning('BigQuery module imported, but google-api-python-client is '
'not installed. Any BigQuery task will fail')
else:
RETRYABLE_ERRORS = (httplib2.HttpLib2Error, IOError, TimeoutError, BrokenPipeError)


# Retry configurations. For more details, see https://tenacity.readthedocs.io/en/latest/
def is_error_5xx(err):
return isinstance(err, errors.HttpError) and err.resp.status >= 500


bq_retry = retry(retry=(retry_if_exception(is_error_5xx) | retry_if_exception_type(RETRYABLE_ERRORS)),
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_attempt(3),
reraise=True,
after=lambda x: x.args[0]._initialise_client()
)


class CreateDisposition:
Expand All @@ -52,6 +76,7 @@ class SourceFormat:
CSV = 'CSV'
DATASTORE_BACKUP = 'DATASTORE_BACKUP'
NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON'
PARQUET = 'PARQUET'


class FieldDelimiter:
Expand Down Expand Up @@ -121,13 +146,23 @@ class BigQueryClient:
"""

def __init__(self, oauth_credentials=None, descriptor='', http_=None):
authenticate_kwargs = gcp.get_authenticate_kwargs(oauth_credentials, http_)
# Save initialisation arguments in case we need to re-create client
# due to connection timeout
self.oauth_credentials = oauth_credentials
self.descriptor = descriptor
self.http_ = http_

self._initialise_client()

if descriptor:
self.client = discovery.build_from_document(descriptor, **authenticate_kwargs)
def _initialise_client(self):
authenticate_kwargs = gcp.get_authenticate_kwargs(self.oauth_credentials, self.http_)

if self.descriptor:
self.client = discovery.build_from_document(self.descriptor, **authenticate_kwargs)
else:
self.client = discovery.build('bigquery', 'v2', cache_discovery=False, **authenticate_kwargs)

@bq_retry
def dataset_exists(self, dataset):
"""Returns whether the given dataset exists.
If regional location is specified for the dataset, that is also checked
Expand All @@ -146,14 +181,14 @@ def dataset_exists(self, dataset):
raise Exception('''Dataset already exists with regional location {}. Can't use {}.'''.format(
fetched_location if fetched_location is not None else 'unspecified',
dataset.location))

except http.HttpError as ex:
if ex.resp.status == 404:
return False
raise

return True

@bq_retry
def table_exists(self, table):
"""Returns whether the given table exists.

Expand Down Expand Up @@ -527,6 +562,16 @@ def allow_quoted_new_lines(self):
""" Indicates if BigQuery should allow quoted data sections that contain newline characters in a CSV file. The default value is false."""
return False

def configure_job(self, configuration):
"""Set additional job configuration.

This allows to specify job configuration parameters that are not exposed via Task properties.

:param configuration: Current configuration.
:return: New or updated configuration.
"""
return configuration

def run(self):
output = self.output()
assert isinstance(output, BigQueryTarget), 'Output must be a BigQueryTarget, not %s' % (output)
Expand Down Expand Up @@ -565,6 +610,8 @@ def run(self):
else:
job['configuration']['load']['autodetect'] = True

job['configuration'] = self.configure_job(job['configuration'])

bq_client.run_job(output.table.project_id, job, dataset=output.table.dataset)


Expand Down Expand Up @@ -610,6 +657,16 @@ def use_legacy_sql(self):
"""
return True

def configure_job(self, configuration):
"""Set additional job configuration.

This allows to specify job configuration parameters that are not exposed via Task properties.

:param configuration: Current configuration.
:return: New or updated configuration.
"""
return configuration

def run(self):
output = self.output()
assert isinstance(output, BigQueryTarget), 'Output must be a BigQueryTarget, not %s' % (output)
Expand Down Expand Up @@ -643,6 +700,8 @@ def run(self):
}
}

job['configuration'] = self.configure_job(job['configuration'])

bq_client.run_job(output.table.project_id, job, dataset=output.table.dataset)


Expand Down Expand Up @@ -739,6 +798,16 @@ def compression(self):
"""Whether to use compression."""
return Compression.NONE

def configure_job(self, configuration):
"""Set additional job configuration.

This allows to specify job configuration parameters that are not exposed via Task properties.

:param configuration: Current configuration.
:return: New or updated configuration.
"""
return configuration

def run(self):
input = luigi.task.flatten(self.input())[0]
assert (
Expand Down Expand Up @@ -775,6 +844,8 @@ def run(self):
job['configuration']['extract']['fieldDelimiter'] = \
self.field_delimiter

job['configuration'] = self.configure_job(job['configuration'])

bq_client.run_job(
input.table.project_id,
job,
Expand Down
Loading