Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allows naming conventions to be changed #998

Merged
merged 114 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
114 commits
Select commit Hold shift + click to select a range
93995b9
allows to decorate async function with dlt.source
rudolfix Feb 20, 2024
4444878
adds pytest-async and updates pytest to 7.x
rudolfix Feb 20, 2024
b3b70f6
fixes forked teardown issue 7.x
rudolfix Feb 20, 2024
f5d7a0a
bumps deps for py 3.12
rudolfix Feb 21, 2024
83dc38a
adds py 12 common tests
rudolfix Feb 21, 2024
21ebfee
fixes typings after deps bump
rudolfix Feb 21, 2024
7985f9d
bumps airflow, yanks duckdb to 0.9.2
rudolfix Feb 21, 2024
07f285e
fixes tests
rudolfix Feb 21, 2024
06e441e
fixes pandas version
rudolfix Feb 21, 2024
3e846a1
adds 3.12 duckdb dep
rudolfix Feb 22, 2024
37b4a31
Merge branch 'devel' into rfix/enables-async-source
rudolfix Feb 22, 2024
934c167
Merge branch 'devel' into rfix/enables-async-source
rudolfix Feb 24, 2024
7fa574d
adds right hand pipe operator
rudolfix Feb 24, 2024
8c7942d
fixes docker ci build
rudolfix Feb 24, 2024
f951fc0
adds docs on async sources and resources
rudolfix Feb 24, 2024
387a7c7
normalizes default hints and preferred types in schema
rudolfix Feb 25, 2024
88728e1
defines pipeline state table in utils, column normalization in simple…
rudolfix Feb 25, 2024
1a53425
normalizes all identifiers used by relational normalizer, fixes other…
rudolfix Feb 25, 2024
8835023
fixes sql job client to use normalized identifiers in queries
rudolfix Feb 25, 2024
f4c504f
runs state sync tests for lower and upper case naming conventions
rudolfix Feb 25, 2024
874cc29
fixes weaviate to use normalized identifiers in queries
rudolfix Feb 25, 2024
c4e9f35
partially fixes qdrant incorrect state and version retrieval queries
rudolfix Feb 25, 2024
6345377
initial sql uppercase naming convention
rudolfix Feb 25, 2024
96a02ff
Merge branch 'devel' into rfix/allows-naming-conventions
rudolfix Mar 8, 2024
aef8cc2
adds native df readers to databricks and bigquery
rudolfix Mar 9, 2024
a53c00b
adds casing identifier capability to support different casing in nami…
rudolfix Mar 9, 2024
91f5780
cleans typing for relational normalizer
rudolfix Mar 9, 2024
5984824
renames escape functions
rudolfix Mar 18, 2024
3458441
destination capabilities for case fold and case sensitivity
rudolfix Mar 18, 2024
55362b0
drops supports naming module and allows naming to be instance in conf…
rudolfix Mar 18, 2024
b836dfe
checks all tables in information schema in one go, observes case fold…
rudolfix Mar 18, 2024
e50bfaa
moves schema verification to destination utils
rudolfix Mar 18, 2024
42d149f
adds method to remove processing hints from schema, helper functions …
rudolfix Mar 18, 2024
c53808f
accepts naming convention instances when resolving configs
rudolfix Mar 18, 2024
b97ae53
fixes the cloning of schema in decorator, removes processing hints
rudolfix Mar 18, 2024
0132c2f
removes processing hints when saving imported schema
rudolfix Mar 18, 2024
2a7c5dd
adds docs on naming conventions, removes technical docs
rudolfix Mar 18, 2024
d502c7c
Merge branch 'devel' into rfix/allows-naming-conventions
rudolfix Jun 4, 2024
3e7504b
Merge branch 'devel' into rfix/allows-naming-conventions
rudolfix Jun 6, 2024
3bb929f
adds casing info to databrick caps, makes caps an instance attr
rudolfix Jun 11, 2024
9f0920c
Merge branch 'devel' into rfix/allows-naming-conventions
rudolfix Jun 11, 2024
724dc15
adjusts destination casing in caps from schema naming and config
rudolfix Jun 11, 2024
b58a118
raises detailed schema identifier clash exceptions
rudolfix Jun 11, 2024
d190ea1
adds is_case_sensitive and name to NamingConvention
rudolfix Jun 11, 2024
b445654
adds sanity check if _dlt prefix is preserved
rudolfix Jun 11, 2024
ee8a95b
finds genric types in non generic classes deriving from generic
rudolfix Jun 11, 2024
eb30838
uses casefold INSERT VALUES job column names
rudolfix Jun 11, 2024
558db91
adds a method make_qualified_table_name_path that calculates componen…
rudolfix Jun 11, 2024
dea9669
adds casing info to destinations, caps as instance attrs, custom tabl…
rudolfix Jun 11, 2024
b1e2b09
adds naming convention to restore state tests, make them essential
rudolfix Jun 11, 2024
210be70
fixes table builder tests
rudolfix Jun 11, 2024
95b703d
removes processing hints when exporting schema to import folder, warn…
rudolfix Jun 12, 2024
4b72b77
allows to subclass INFO SCHEMA query generation and uses specialized …
rudolfix Jun 12, 2024
ab39e06
uses correct schema escaping function in sql jobs
rudolfix Jun 12, 2024
2ae3ad2
passes pipeline state to package state via extract
rudolfix Jun 12, 2024
09b7731
fixes optional normalizers module
rudolfix Jun 12, 2024
cfd3e5f
excludes version_hash from pipeline state SELECT
rudolfix Jun 12, 2024
0edbbfd
passes pipeline state to package state pt.2
rudolfix Jun 12, 2024
5769ba1
re-enables sentry tests
rudolfix Jun 12, 2024
1f17a44
bumps qdrant client, makes test running for local version
rudolfix Jun 12, 2024
71e418b
makes weaviate running
rudolfix Jun 12, 2024
ce414e1
uses schemata to find databases on athena
rudolfix Jun 13, 2024
bde61a9
uses api get_table for hidden dataset on bigquery to reflect schemas,…
rudolfix Jun 13, 2024
036e3dd
adds naming conventions to two restore state tests
rudolfix Jun 13, 2024
8546763
fixes escape identifiers to column escape
rudolfix Jun 13, 2024
f57e286
fix conflicts in docs
rudolfix Jun 13, 2024
cf50bd4
adjusts capabilities in capabilities() method, uses config and naming…
rudolfix Jun 15, 2024
72969ce
allows to add props to classes without vectorizer in weaviate
rudolfix Jun 15, 2024
656d5fc
moves caps function into factories, cleansup adapters and custom dest…
rudolfix Jun 15, 2024
bbd7fe6
sentry_dsn
rudolfix Jun 15, 2024
a671508
adds basic destination reference tests
rudolfix Jun 15, 2024
81e0db9
fixes table builder tests
rudolfix Jun 15, 2024
8a32793
fix deps and docs
rudolfix Jun 15, 2024
0dc6dc8
fixes more tests
rudolfix Jun 16, 2024
4a39795
case sensitivity docs stubs
rudolfix Jun 17, 2024
43d6d5f
fixes drop_pipeline fixture
rudolfix Jun 17, 2024
e3d998c
improves partial config generation for capabilities
rudolfix Jun 17, 2024
3aef3fd
adds snowflake csv support
rudolfix Jun 17, 2024
6df7a34
creates separate csv tests
rudolfix Jun 17, 2024
57aec2e
allows to import files into extract storage, adds import file writer …
rudolfix Jun 19, 2024
fee7af5
handles ImportFileMeta in extractor
rudolfix Jun 19, 2024
96c7222
adds import file item normalizer and router to normalize
rudolfix Jun 19, 2024
116add0
supports csv format config for snowflake
rudolfix Jun 19, 2024
42eacaf
removes realpath wherever possible and adds fast make_full_path to Fi…
rudolfix Jun 20, 2024
3793d06
adds additional methods to load_package storage to make listings faster
rudolfix Jun 20, 2024
88eec9c
adds file_format to dlt.resource, uses preferred file format for dlt …
rudolfix Jun 21, 2024
8e0f0a8
docs for importing files, file_format
rudolfix Jun 21, 2024
b1c095c
code improvements and tests
rudolfix Jun 21, 2024
46ec732
docs hard links note
rudolfix Jun 21, 2024
2194b18
Merge pull request #1479 from dlt-hub/feat/snowflake-csv-support
rudolfix Jun 21, 2024
1384ed3
Merge branch 'devel' into rfix/allows-naming-conventions
rudolfix Jun 21, 2024
b00cbb2
moves loader parallelism test to pipeliens, solves duckdb ci test err…
rudolfix Jun 23, 2024
a530345
fixes tests
rudolfix Jun 23, 2024
4271895
moves drop_pipeline fixture level up
rudolfix Jun 23, 2024
abd02df
drops default naming convention from caps so naming in saved schema p…
rudolfix Jun 24, 2024
14b4b0e
unifies all representations of pipeline state
rudolfix Jun 24, 2024
60e45b1
tries to decompress text file first in fs_client
rudolfix Jun 24, 2024
a84be2a
tests get stored state in test_job_client
rudolfix Jun 24, 2024
1dc7a09
removes credentials from dlt.attach, addes destination and staging fa…
rudolfix Jun 24, 2024
ab69b76
cleans up env variables and pipeline dropping fixutere precedence
rudolfix Jun 24, 2024
0eeb21d
Merge branch 'devel' into rfix/allows-naming-conventions
rudolfix Jun 24, 2024
f1097d8
removes dev_mode from dlt.attach
rudolfix Jun 24, 2024
3855fcc
adds missing arguments to filesystem factory
rudolfix Jun 24, 2024
651412e
fixes tests
rudolfix Jun 24, 2024
aab36e1
updates destination and naming convention docs
rudolfix Jun 25, 2024
7294aae
removes is_case_sensitive from naming convention initializer
rudolfix Jun 26, 2024
dc10473
simplifies with_file_import mark
rudolfix Jun 26, 2024
727a35e
adds case sensitivity tests
rudolfix Jun 26, 2024
4cb2646
uses dev_mode everywhere
rudolfix Jun 26, 2024
f098e5a
improves csv docs
rudolfix Jun 26, 2024
1521778
fixes encodings in fsspec
rudolfix Jun 26, 2024
796483e
improves naming convention docs
rudolfix Jun 26, 2024
534c7f8
fixes tests and renames clash to collision
rudolfix Jun 26, 2024
5f4cb4c
fixes getting original bases from instance
rudolfix Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions dlt/destinations/impl/bigquery/sql_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from contextlib import contextmanager
from typing import Any, AnyStr, ClassVar, Iterator, List, Optional, Sequence
from typing import Any, AnyStr, ClassVar, Iterator, List, Optional, Sequence, Generator

import google.cloud.bigquery as bigquery # noqa: I250
from google.api_core import exceptions as api_core_exceptions
Expand All @@ -8,6 +8,7 @@
from google.cloud.bigquery.dbapi import Connection as DbApiConnection, Cursor as BQDbApiCursor
from google.cloud.bigquery.dbapi import exceptions as dbapi_exceptions

from dlt.common import logger
from dlt.common.configuration.specs import GcpServiceAccountCredentialsWithoutDefaults
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.typing import StrAny
Expand Down Expand Up @@ -44,17 +45,30 @@ class BigQueryDBApiCursorImpl(DBApiCursorImpl):
"""Use native BigQuery data frame support if available"""

native_cursor: BQDbApiCursor # type: ignore
df_iterator: Generator[Any, None, None]

def df(self, chunk_size: int = None, **kwargs: Any) -> DataFrame:
if chunk_size is not None:
return super().df(chunk_size=chunk_size)
query_job: bigquery.QueryJob = self.native_cursor._query_job
def __init__(self, curr: DBApiCursor) -> None:
super().__init__(curr)
self.df_iterator = None

def df(self, chunk_size: int = None, **kwargs: Any) -> DataFrame:
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
query_job: bigquery.QueryJob = self.native_cursor.query_job
if self.df_iterator:
return next(self.df_iterator, None)
try:
if chunk_size is not None:
# create iterator with given page size
self.df_iterator = query_job.result(page_size=chunk_size).to_dataframe_iterable()
return next(self.df_iterator, None)
return query_job.to_dataframe(**kwargs)
except ValueError:
except ValueError as ex:
# no pyarrow/db-types, fallback to our implementation
return super().df()
logger.warning(f"Native BigQuery pandas reader could not be used: {str(ex)}")
return super().df(chunk_size=chunk_size)

def close(self) -> None:
if self.df_iterator:
self.df_iterator.close()


class BigQuerySqlClient(SqlClientBase[bigquery.Client], DBTransaction):
Expand Down Expand Up @@ -220,12 +234,11 @@ def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DB
conn.close()

def fully_qualified_dataset_name(self, escape: bool = True) -> str:
project_id = self.capabilities.case_identifier(self.credentials.project_id)
dataset_name = self.capabilities.case_identifier(self.dataset_name)
if escape:
project_id = self.capabilities.escape_identifier(self.credentials.project_id)
dataset_name = self.capabilities.escape_identifier(self.dataset_name)
else:
project_id = self.credentials.project_id
dataset_name = self.dataset_name
project_id = self.capabilities.escape_identifier(project_id)
dataset_name = self.capabilities.escape_identifier(dataset_name)
return f"{project_id}.{dataset_name}"

@classmethod
Expand Down
38 changes: 27 additions & 11 deletions dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
)
from databricks.sql.exc import Error as DatabricksSqlError

from dlt.common import pendulum
from dlt.common import logger
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.destinations.exceptions import (
DatabaseTerminalException,
Expand All @@ -22,10 +20,26 @@
raise_database_error,
raise_open_connection_error,
)
from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction
from dlt.destinations.typing import DBApi, DBApiCursor, DBTransaction, DataFrame
from dlt.destinations.impl.databricks.configuration import DatabricksCredentials
from dlt.destinations.impl.databricks import capabilities
from dlt.common.time import to_py_date, to_py_datetime


class DatabricksCursorImpl(DBApiCursorImpl):
"""Use native data frame support if available"""

native_cursor: DatabricksSqlCursor
vector_size: ClassVar[int] = 2048

def df(self, chunk_size: int = None, **kwargs: Any) -> DataFrame:
if chunk_size is None:
return self.native_cursor.fetchall_arrow().to_pandas()
else:
df = self.native_cursor.fetchmany_arrow(chunk_size).to_pandas()
if df.shape[0] == 0:
return None
else:
return df


class DatabricksSqlClient(SqlClientBase[DatabricksSqlConnection], DBTransaction):
Expand All @@ -39,7 +53,9 @@ def __init__(self, dataset_name: str, credentials: DatabricksCredentials) -> Non

def open_connection(self) -> DatabricksSqlConnection:
conn_params = self.credentials.to_connector_params()
self._conn = databricks_lib.connect(**conn_params, schema=self.dataset_name)
self._conn = databricks_lib.connect(
**conn_params, schema=self.dataset_name, use_inline_params="silent"
)
return self._conn

@raise_open_connection_error
Expand Down Expand Up @@ -91,6 +107,7 @@ def execute_sql(
def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]:
curr: DBApiCursor = None
# TODO: databricks connector 3.0.0 will use :named paramstyle only
# NOTE: we were able to use the old style until they get deprecated
# if args:
# keys = [f"arg{i}" for i in range(len(args))]
# # Replace position arguments (%s) with named arguments (:arg0, :arg1, ...)
Expand All @@ -114,15 +131,14 @@ def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DB
db_args = None
with self._conn.cursor() as curr:
curr.execute(query, db_args)
yield DBApiCursorImpl(curr) # type: ignore[abstract]
yield DatabricksCursorImpl(curr) # type: ignore[abstract]

def fully_qualified_dataset_name(self, escape: bool = True) -> str:
catalog = self.capabilities.case_identifier(self.credentials.catalog)
dataset_name = self.capabilities.case_identifier(self.dataset_name)
if escape:
catalog = self.capabilities.escape_identifier(self.credentials.catalog)
dataset_name = self.capabilities.escape_identifier(self.dataset_name)
else:
catalog = self.credentials.catalog
dataset_name = self.dataset_name
catalog = self.capabilities.escape_identifier(catalog)
dataset_name = self.capabilities.escape_identifier(dataset_name)
return f"{catalog}.{dataset_name}"

@staticmethod
Expand Down