From eca4914243e340cd00ba30ba76aad2cdf7cef3ba Mon Sep 17 00:00:00 2001 From: darker Date: Fri, 7 Jul 2023 15:06:40 +0200 Subject: [PATCH] feat: google big query database selection much faster [TCTC-6113] (#1126) * poc: google big query database selection much faster * feat: clean and update * feat: update database fetch * feat: update on the db selection to check some stuffs * feat: update the field to db_schema * oupsi, we shouldn't remove the database field * chore: clean && update * feat:keep the default database but return also the list of schema * chore: remove and clean print * feat: refactor to add schema to get_model * feat: we don't need project_tree in this connector * feat: more update with the new form of the schema_name * test: update tests * chore: update on changelog * fix: more tests * test: coverage * oupsi --- CHANGELOG.md | 3 + .../google_big_query/test_google_big_query.py | 249 +++++++++++++++--- .../google_big_query_connector.py | 112 +++++--- 3 files changed, 289 insertions(+), 75 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index db8a76eac..01750192c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +## Changed + +- Feat[Goole Big Query] : We can now get the database model(list of tables) based on a given schema name to speed up the project tree structure. - Fix: on mysql, avoid duplicated columns when retrieving table informations ### [4.6.0] 2023-06-02 diff --git a/tests/google_big_query/test_google_big_query.py b/tests/google_big_query/test_google_big_query.py index 76c640bf3..37845abe4 100644 --- a/tests/google_big_query/test_google_big_query.py +++ b/tests/google_big_query/test_google_big_query.py @@ -1,3 +1,4 @@ +from typing import Any, Generator from unittest.mock import patch import pandas @@ -180,8 +181,8 @@ class FakeResponse: def __init__(self) -> None: ... - def to_dataframe(self) -> pd.DataFrame: - return pd.DataFrame( + def to_dataframe(self) -> Generator[Any, Any, Any]: + yield pd.DataFrame( [ { 'name': 'coucou', @@ -271,6 +272,11 @@ def to_dataframe(self) -> pd.DataFrame: return_value=Client, ) + mocker.patch( + 'toucan_connectors.google_big_query.google_big_query_connector.GoogleBigQueryConnector._fetch_query_results', + return_value=FakeResponse().to_dataframe(), + ) + mocker.patch( 'toucan_connectors.google_big_query.google_big_query_connector.GoogleBigQueryConnector._get_google_credentials', return_value=Credentials, @@ -321,51 +327,156 @@ def to_dataframe(self) -> pd.DataFrame: assert ( mocked_query.call_args_list[0][0][0] == """ -SELECT C.table_name AS name, C.table_schema AS schema, T.table_catalog AS database, -T.table_type AS type, C.column_name, C.data_type FROM foooo.INFORMATION_SCHEMA.COLUMNS C -JOIN foooo.INFORMATION_SCHEMA.TABLES T ON C.table_name = T.table_name -WHERE IS_SYSTEM_DEFINED='NO' AND IS_PARTITIONING_COLUMN='NO' AND IS_HIDDEN='NO' +SELECT + C.table_name AS name, + C.table_schema AS schema, + T.table_catalog AS database, + T.table_type AS type, + C.column_name, + C.data_type +FROM + foooo.INFORMATION_SCHEMA.COLUMNS C + JOIN foooo.INFORMATION_SCHEMA.TABLES T + ON C.table_name = T.table_name +WHERE + IS_SYSTEM_DEFINED = 'NO' + AND IS_PARTITIONING_COLUMN = 'NO' + AND IS_HIDDEN = 'NO' UNION ALL -SELECT C.table_name AS name, C.table_schema AS schema, T.table_catalog AS database, -T.table_type AS type, C.column_name, C.data_type FROM baarrrr.INFORMATION_SCHEMA.COLUMNS C -JOIN baarrrr.INFORMATION_SCHEMA.TABLES T ON C.table_name = T.table_name -WHERE IS_SYSTEM_DEFINED='NO' AND IS_PARTITIONING_COLUMN='NO' AND IS_HIDDEN='NO' +SELECT + C.table_name AS name, + C.table_schema AS schema, + T.table_catalog AS database, + T.table_type AS type, + C.column_name, + C.data_type +FROM + baarrrr.INFORMATION_SCHEMA.COLUMNS C + JOIN baarrrr.INFORMATION_SCHEMA.TABLES T + ON C.table_name = T.table_name +WHERE + IS_SYSTEM_DEFINED = 'NO' + AND IS_PARTITIONING_COLUMN = 'NO' + AND IS_HIDDEN = 'NO' UNION ALL -SELECT C.table_name AS name, C.table_schema AS schema, T.table_catalog AS database, -T.table_type AS type, C.column_name, C.data_type FROM taar.INFORMATION_SCHEMA.COLUMNS C -JOIN taar.INFORMATION_SCHEMA.TABLES T ON C.table_name = T.table_name -WHERE IS_SYSTEM_DEFINED='NO' AND IS_PARTITIONING_COLUMN='NO' AND IS_HIDDEN='NO' +SELECT + C.table_name AS name, + C.table_schema AS schema, + T.table_catalog AS database, + T.table_type AS type, + C.column_name, + C.data_type +FROM + taar.INFORMATION_SCHEMA.COLUMNS C + JOIN taar.INFORMATION_SCHEMA.TABLES T + ON C.table_name = T.table_name +WHERE + IS_SYSTEM_DEFINED = 'NO' + AND IS_PARTITIONING_COLUMN = 'NO' + AND IS_HIDDEN = 'NO' """ ) mocked_query.reset_mock() + + mocker.patch( + 'toucan_connectors.google_big_query.google_big_query_connector.GoogleBigQueryConnector._fetch_query_results', + return_value=FakeResponse().to_dataframe(), + ) + connector.get_model('some-db') assert ( mocked_query.call_args_list[0][0][0] == """ -SELECT C.table_name AS name, C.table_schema AS schema, T.table_catalog AS database, -T.table_type AS type, C.column_name, C.data_type FROM foooo.INFORMATION_SCHEMA.COLUMNS C -JOIN foooo.INFORMATION_SCHEMA.TABLES T ON C.table_name = T.table_name -WHERE IS_SYSTEM_DEFINED='NO' AND IS_PARTITIONING_COLUMN='NO' AND IS_HIDDEN='NO' +SELECT + C.table_name AS name, + C.table_schema AS schema, + T.table_catalog AS database, + T.table_type AS type, + C.column_name, + C.data_type +FROM + foooo.INFORMATION_SCHEMA.COLUMNS C + JOIN foooo.INFORMATION_SCHEMA.TABLES T + ON C.table_name = T.table_name +WHERE + IS_SYSTEM_DEFINED = 'NO' + AND IS_PARTITIONING_COLUMN = 'NO' + AND IS_HIDDEN = 'NO' AND T.table_catalog = 'some-db' UNION ALL -SELECT C.table_name AS name, C.table_schema AS schema, T.table_catalog AS database, -T.table_type AS type, C.column_name, C.data_type FROM baarrrr.INFORMATION_SCHEMA.COLUMNS C -JOIN baarrrr.INFORMATION_SCHEMA.TABLES T ON C.table_name = T.table_name -WHERE IS_SYSTEM_DEFINED='NO' AND IS_PARTITIONING_COLUMN='NO' AND IS_HIDDEN='NO' +SELECT + C.table_name AS name, + C.table_schema AS schema, + T.table_catalog AS database, + T.table_type AS type, + C.column_name, + C.data_type +FROM + baarrrr.INFORMATION_SCHEMA.COLUMNS C + JOIN baarrrr.INFORMATION_SCHEMA.TABLES T + ON C.table_name = T.table_name +WHERE + IS_SYSTEM_DEFINED = 'NO' + AND IS_PARTITIONING_COLUMN = 'NO' + AND IS_HIDDEN = 'NO' AND T.table_catalog = 'some-db' UNION ALL -SELECT C.table_name AS name, C.table_schema AS schema, T.table_catalog AS database, -T.table_type AS type, C.column_name, C.data_type FROM taar.INFORMATION_SCHEMA.COLUMNS C -JOIN taar.INFORMATION_SCHEMA.TABLES T ON C.table_name = T.table_name -WHERE IS_SYSTEM_DEFINED='NO' AND IS_PARTITIONING_COLUMN='NO' AND IS_HIDDEN='NO' +SELECT + C.table_name AS name, + C.table_schema AS schema, + T.table_catalog AS database, + T.table_type AS type, + C.column_name, + C.data_type +FROM + taar.INFORMATION_SCHEMA.COLUMNS C + JOIN taar.INFORMATION_SCHEMA.TABLES T + ON C.table_name = T.table_name +WHERE + IS_SYSTEM_DEFINED = 'NO' + AND IS_PARTITIONING_COLUMN = 'NO' + AND IS_HIDDEN = 'NO' +AND T.table_catalog = 'some-db' +""" + ) + + mocked_query.reset_mock() + + mocker.patch( + 'toucan_connectors.google_big_query.google_big_query_connector.GoogleBigQueryConnector._fetch_query_results', + return_value=FakeResponse().to_dataframe(), + ) + + connector.get_model('some-db', 'foooo') + + # since we specified only the foooo schema we should only get the query for + # it + assert ( + mocked_query.call_args_list[0][0][0] + == """ +SELECT + C.table_name AS name, + C.table_schema AS schema, + T.table_catalog AS database, + T.table_type AS type, + C.column_name, + C.data_type +FROM + foooo.INFORMATION_SCHEMA.COLUMNS C + JOIN foooo.INFORMATION_SCHEMA.TABLES T + ON C.table_name = T.table_name +WHERE + IS_SYSTEM_DEFINED = 'NO' + AND IS_PARTITIONING_COLUMN = 'NO' + AND IS_HIDDEN = 'NO' AND T.table_catalog = 'some-db' """ ) @@ -468,17 +579,39 @@ def test_get_model_multi_location(mocker: MockFixture, _fixture_credentials) -> assert ( mocked_query.call_args_list[0][0][0] == """ -SELECT C.table_name AS name, C.table_schema AS schema, T.table_catalog AS database, -T.table_type AS type, C.column_name, C.data_type FROM foooo.INFORMATION_SCHEMA.COLUMNS C -JOIN foooo.INFORMATION_SCHEMA.TABLES T ON C.table_name = T.table_name -WHERE IS_SYSTEM_DEFINED='NO' AND IS_PARTITIONING_COLUMN='NO' AND IS_HIDDEN='NO' +SELECT + C.table_name AS name, + C.table_schema AS schema, + T.table_catalog AS database, + T.table_type AS type, + C.column_name, + C.data_type +FROM + foooo.INFORMATION_SCHEMA.COLUMNS C + JOIN foooo.INFORMATION_SCHEMA.TABLES T + ON C.table_name = T.table_name +WHERE + IS_SYSTEM_DEFINED = 'NO' + AND IS_PARTITIONING_COLUMN = 'NO' + AND IS_HIDDEN = 'NO' UNION ALL -SELECT C.table_name AS name, C.table_schema AS schema, T.table_catalog AS database, -T.table_type AS type, C.column_name, C.data_type FROM baarrrr.INFORMATION_SCHEMA.COLUMNS C -JOIN baarrrr.INFORMATION_SCHEMA.TABLES T ON C.table_name = T.table_name -WHERE IS_SYSTEM_DEFINED='NO' AND IS_PARTITIONING_COLUMN='NO' AND IS_HIDDEN='NO' +SELECT + C.table_name AS name, + C.table_schema AS schema, + T.table_catalog AS database, + T.table_type AS type, + C.column_name, + C.data_type +FROM + baarrrr.INFORMATION_SCHEMA.COLUMNS C + JOIN baarrrr.INFORMATION_SCHEMA.TABLES T + ON C.table_name = T.table_name +WHERE + IS_SYSTEM_DEFINED = 'NO' + AND IS_PARTITIONING_COLUMN = 'NO' + AND IS_HIDDEN = 'NO' """ ) # No location should be specified in the happy path @@ -486,10 +619,21 @@ def test_get_model_multi_location(mocker: MockFixture, _fixture_credentials) -> assert ( mocked_query.call_args_list[1][0][0] == """ -SELECT C.table_name AS name, C.table_schema AS schema, T.table_catalog AS database, -T.table_type AS type, C.column_name, C.data_type FROM foooo.INFORMATION_SCHEMA.COLUMNS C -JOIN foooo.INFORMATION_SCHEMA.TABLES T ON C.table_name = T.table_name -WHERE IS_SYSTEM_DEFINED='NO' AND IS_PARTITIONING_COLUMN='NO' AND IS_HIDDEN='NO' +SELECT + C.table_name AS name, + C.table_schema AS schema, + T.table_catalog AS database, + T.table_type AS type, + C.column_name, + C.data_type +FROM + foooo.INFORMATION_SCHEMA.COLUMNS C + JOIN foooo.INFORMATION_SCHEMA.TABLES T + ON C.table_name = T.table_name +WHERE + IS_SYSTEM_DEFINED = 'NO' + AND IS_PARTITIONING_COLUMN = 'NO' + AND IS_HIDDEN = 'NO' """ ) # Next calls should specify the location @@ -497,17 +641,36 @@ def test_get_model_multi_location(mocker: MockFixture, _fixture_credentials) -> assert ( mocked_query.call_args_list[2][0][0] == """ -SELECT C.table_name AS name, C.table_schema AS schema, T.table_catalog AS database, -T.table_type AS type, C.column_name, C.data_type FROM baarrrr.INFORMATION_SCHEMA.COLUMNS C -JOIN baarrrr.INFORMATION_SCHEMA.TABLES T ON C.table_name = T.table_name -WHERE IS_SYSTEM_DEFINED='NO' AND IS_PARTITIONING_COLUMN='NO' AND IS_HIDDEN='NO' +SELECT + C.table_name AS name, + C.table_schema AS schema, + T.table_catalog AS database, + T.table_type AS type, + C.column_name, + C.data_type +FROM + baarrrr.INFORMATION_SCHEMA.COLUMNS C + JOIN baarrrr.INFORMATION_SCHEMA.TABLES T + ON C.table_name = T.table_name +WHERE + IS_SYSTEM_DEFINED = 'NO' + AND IS_PARTITIONING_COLUMN = 'NO' + AND IS_HIDDEN = 'NO' """ ) # Next calls should specify the location assert mocked_query.call_args_list[2][1] == {'location': 'Toulouse'} -def test_get_form(_fixture_credentials: MockFixture) -> None: +def test_get_form(mocker: MockFixture, _fixture_credentials: MockFixture) -> None: + def mock_available_schs(): + return ['ok', 'test'] + + mocker.patch( + 'toucan_connectors.google_big_query.google_big_query_connector.GoogleBigQueryConnector._available_schs', + return_value=mock_available_schs, + ) + assert ( GoogleBigQueryDataSource(query=',', name='MyGBQ', domain='foo').get_form( GoogleBigQueryConnector( diff --git a/toucan_connectors/google_big_query/google_big_query_connector.py b/toucan_connectors/google_big_query/google_big_query_connector.py index 0fca8bbed..8404a7522 100644 --- a/toucan_connectors/google_big_query/google_big_query_connector.py +++ b/toucan_connectors/google_big_query/google_big_query_connector.py @@ -3,13 +3,13 @@ from functools import cached_property from itertools import groupby from timeit import default_timer as timer -from typing import Any, Dict, Iterable, List, Union +from typing import Any, Dict, Generator, Iterable, List, Union -import pandas import pandas as pd from google.api_core.exceptions import GoogleAPIError from google.cloud import bigquery from google.cloud.bigquery.dbapi import _helpers as bigquery_helpers +from google.cloud.bigquery.job import QueryJob from google.oauth2.service_account import Credentials from pydantic import Field, create_model @@ -20,10 +20,14 @@ TableInfo, ToucanConnector, ToucanDataSource, + strlist_to_enum, ) _LOGGER = logging.getLogger(__name__) +_PAGE_SIZE = 50 +_MAXIMUM_RESULTS_FETCHED = 2000 + class Dialect(str, Enum): legacy = 'legacy' @@ -43,15 +47,18 @@ class GoogleBigQueryDataSource(ToucanDataSource): **{'ui.hidden': True}, ) language: str = Field('sql', **{'ui.hidden': True}) - database: str = Field(None) # Needed for graphical selection in frontend but not used - - class Config: - extra = 'ignore' + database: str = Field(None, **{'ui.hidden': True}) + db_schema: str = Field(None, description='The name of the db_schema you want to query.') @classmethod def get_form(cls, connector: 'GoogleBigQueryConnector', current_config: dict[str, Any]): - schema = create_model('FormSchema', __base__=cls).schema() + schema = create_model( + 'FormSchema', + db_schema=strlist_to_enum('db_schema', connector._available_schs), + __base__=cls, + ).schema() schema['properties']['database']['default'] = connector.credentials.project_id + return schema @@ -123,11 +130,8 @@ def _connect(credentials: Credentials) -> bigquery.Client: ) return client - def project_tree(self) -> list[TableInfo]: - return self._get_project_structure() - @staticmethod - def _execute_query(client: bigquery.Client, query: str, parameters: list) -> pandas.DataFrame: + def _execute_query(client: bigquery.Client, query: str, parameters: list) -> pd.DataFrame: try: start = timer() @@ -226,21 +230,43 @@ def _format_columns(x: str): @staticmethod def _build_dataset_info_query_for_ds(dataset_id: str, db_name: str | None) -> str: - output = f""" -SELECT C.table_name AS name, C.table_schema AS schema, T.table_catalog AS database, -T.table_type AS type, C.column_name, C.data_type FROM {dataset_id}.INFORMATION_SCHEMA.COLUMNS C -JOIN {dataset_id}.INFORMATION_SCHEMA.TABLES T ON C.table_name = T.table_name -WHERE IS_SYSTEM_DEFINED='NO' AND IS_PARTITIONING_COLUMN='NO' AND IS_HIDDEN='NO' + query = f""" +SELECT + C.table_name AS name, + C.table_schema AS schema, + T.table_catalog AS database, + T.table_type AS type, + C.column_name, + C.data_type +FROM + {dataset_id}.INFORMATION_SCHEMA.COLUMNS C + JOIN {dataset_id}.INFORMATION_SCHEMA.TABLES T + ON C.table_name = T.table_name +WHERE + IS_SYSTEM_DEFINED = 'NO' + AND IS_PARTITIONING_COLUMN = 'NO' + AND IS_HIDDEN = 'NO' """ + if db_name is not None: - output += f"AND T.table_catalog = '{db_name}'\n" - return output + query += f"AND T.table_catalog = '{db_name}'\n" + + return query def _build_dataset_info_query(self, dataset_ids: Iterable[str], db_name: str | None) -> str: return '\nUNION ALL\n'.join( self._build_dataset_info_query_for_ds(dataset_id, db_name) for dataset_id in dataset_ids ) + def _fetch_query_results( + self, query_job: QueryJob + ) -> Generator[Any, Any, Any]: # pragma: no cover + """Fetches query results in a paginated manner using a generator.""" + row_iterator = query_job.result(page_size=_PAGE_SIZE, max_results=_MAXIMUM_RESULTS_FETCHED) + + while rows := next(row_iterator.pages, None): + yield rows.to_dataframe() + def _get_project_structure_fast( self, client: bigquery.Client, db_name: str | None, dataset_ids: Iterable[str] ) -> pd.DataFrame: @@ -249,7 +275,14 @@ def _get_project_structure_fast( Only works if all datasets are in the same location. """ query = self._build_dataset_info_query(dataset_ids, db_name) - return client.query(query).to_dataframe() + + try: + query_job = client.query(query) + # Fetch pages of results using the generator + # and Concatenate all dataframes into a single one + return pd.concat((df for df in self._fetch_query_results(query_job)), ignore_index=True) + except Exception as exc: + raise GoogleAPIError(f'An error occurred while executing the query: {exc}') from exc def _get_project_structure_slow( self, client: bigquery.Client, db_name: str | None, dataset_ids: Iterable[str] @@ -276,28 +309,43 @@ def _get_project_structure_slow( # Then, we returning a single dataframe containing all results return pd.concat(dfs) - def _get_project_structure(self, db_name: str | None = None) -> List[TableInfo]: + @cached_property + def _available_schs(self) -> list[str]: # pragma: no cover + credentials = self._get_google_credentials(self.credentials, self.scopes) + client = bigquery.Client(location=None, credentials=credentials) + + return pd.Series((ds.dataset_id for ds in client.list_datasets())).values + + def _get_project_structure( + self, db_name: str | None = None, schema_name: str | None = None + ) -> List[TableInfo]: creds = self._get_google_credentials(self.credentials, self.scopes) client = self._connect(creds) - datasets = list(client.list_datasets()) - # Here, we're trying to retrieve table info for all datasets at once. However, this will - # only work if all datasets are in same location. Unfortunately, there is no way to - # retrieve the location along with the dataset list, so we're optimistic here. + + # Either the schema_name is not specified + if schema_name is None: + dataset_ids = [ds.dataset_id for ds in list(client.list_datasets())] + else: + # if we already now the dataset/schema, we should be able to just + # fetch it instead of all of them + dataset_ids = [schema_name] + try: - df = self._get_project_structure_fast( - client, db_name, (ds.dataset_id for ds in datasets) - ) + # Here, we're trying to retrieve table info for all datasets at once. However, this will + # only work if all datasets are in same location. Unfortunately, there is no way to + # retrieve the location along with the dataset list, so we're optimistic here. + df = self._get_project_structure_fast(client, db_name, dataset_ids) except GoogleAPIError as exc: _LOGGER.info( f'Got an exception when trying to retrieve domains for project: {exc}. ' 'Falling back on listing by location...' ) - df = self._get_project_structure_slow( - client, db_name, (ds.dataset_id for ds in datasets) - ) + df = self._get_project_structure_slow(client, db_name, dataset_ids) return self._format_db_model(df) - def get_model(self, db_name: str | None = None) -> list[TableInfo]: + def get_model( + self, db_name: str | None = None, schema_name: str | None = None + ) -> list[TableInfo]: """Retrieves the database tree structure using current connection""" - return self._get_project_structure(db_name) + return self._get_project_structure(db_name, schema_name)