Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allows naming conventions to be changed #998

Merged
merged 114 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 105 commits
Commits
Show all changes
114 commits
Select commit Hold shift + click to select a range
93995b9
allows to decorate async function with dlt.source
rudolfix Feb 20, 2024
4444878
adds pytest-async and updates pytest to 7.x
rudolfix Feb 20, 2024
b3b70f6
fixes forked teardown issue 7.x
rudolfix Feb 20, 2024
f5d7a0a
bumps deps for py 3.12
rudolfix Feb 21, 2024
83dc38a
adds py 12 common tests
rudolfix Feb 21, 2024
21ebfee
fixes typings after deps bump
rudolfix Feb 21, 2024
7985f9d
bumps airflow, yanks duckdb to 0.9.2
rudolfix Feb 21, 2024
07f285e
fixes tests
rudolfix Feb 21, 2024
06e441e
fixes pandas version
rudolfix Feb 21, 2024
3e846a1
adds 3.12 duckdb dep
rudolfix Feb 22, 2024
37b4a31
Merge branch 'devel' into rfix/enables-async-source
rudolfix Feb 22, 2024
934c167
Merge branch 'devel' into rfix/enables-async-source
rudolfix Feb 24, 2024
7fa574d
adds right hand pipe operator
rudolfix Feb 24, 2024
8c7942d
fixes docker ci build
rudolfix Feb 24, 2024
f951fc0
adds docs on async sources and resources
rudolfix Feb 24, 2024
387a7c7
normalizes default hints and preferred types in schema
rudolfix Feb 25, 2024
88728e1
defines pipeline state table in utils, column normalization in simple…
rudolfix Feb 25, 2024
1a53425
normalizes all identifiers used by relational normalizer, fixes other…
rudolfix Feb 25, 2024
8835023
fixes sql job client to use normalized identifiers in queries
rudolfix Feb 25, 2024
f4c504f
runs state sync tests for lower and upper case naming conventions
rudolfix Feb 25, 2024
874cc29
fixes weaviate to use normalized identifiers in queries
rudolfix Feb 25, 2024
c4e9f35
partially fixes qdrant incorrect state and version retrieval queries
rudolfix Feb 25, 2024
6345377
initial sql uppercase naming convention
rudolfix Feb 25, 2024
96a02ff
Merge branch 'devel' into rfix/allows-naming-conventions
rudolfix Mar 8, 2024
aef8cc2
adds native df readers to databricks and bigquery
rudolfix Mar 9, 2024
a53c00b
adds casing identifier capability to support different casing in nami…
rudolfix Mar 9, 2024
91f5780
cleans typing for relational normalizer
rudolfix Mar 9, 2024
5984824
renames escape functions
rudolfix Mar 18, 2024
3458441
destination capabilities for case fold and case sensitivity
rudolfix Mar 18, 2024
55362b0
drops supports naming module and allows naming to be instance in conf…
rudolfix Mar 18, 2024
b836dfe
checks all tables in information schema in one go, observes case fold…
rudolfix Mar 18, 2024
e50bfaa
moves schema verification to destination utils
rudolfix Mar 18, 2024
42d149f
adds method to remove processing hints from schema, helper functions …
rudolfix Mar 18, 2024
c53808f
accepts naming convention instances when resolving configs
rudolfix Mar 18, 2024
b97ae53
fixes the cloning of schema in decorator, removes processing hints
rudolfix Mar 18, 2024
0132c2f
removes processing hints when saving imported schema
rudolfix Mar 18, 2024
2a7c5dd
adds docs on naming conventions, removes technical docs
rudolfix Mar 18, 2024
d502c7c
Merge branch 'devel' into rfix/allows-naming-conventions
rudolfix Jun 4, 2024
3e7504b
Merge branch 'devel' into rfix/allows-naming-conventions
rudolfix Jun 6, 2024
3bb929f
adds casing info to databrick caps, makes caps an instance attr
rudolfix Jun 11, 2024
9f0920c
Merge branch 'devel' into rfix/allows-naming-conventions
rudolfix Jun 11, 2024
724dc15
adjusts destination casing in caps from schema naming and config
rudolfix Jun 11, 2024
b58a118
raises detailed schema identifier clash exceptions
rudolfix Jun 11, 2024
d190ea1
adds is_case_sensitive and name to NamingConvention
rudolfix Jun 11, 2024
b445654
adds sanity check if _dlt prefix is preserved
rudolfix Jun 11, 2024
ee8a95b
finds genric types in non generic classes deriving from generic
rudolfix Jun 11, 2024
eb30838
uses casefold INSERT VALUES job column names
rudolfix Jun 11, 2024
558db91
adds a method make_qualified_table_name_path that calculates componen…
rudolfix Jun 11, 2024
dea9669
adds casing info to destinations, caps as instance attrs, custom tabl…
rudolfix Jun 11, 2024
b1e2b09
adds naming convention to restore state tests, make them essential
rudolfix Jun 11, 2024
210be70
fixes table builder tests
rudolfix Jun 11, 2024
95b703d
removes processing hints when exporting schema to import folder, warn…
rudolfix Jun 12, 2024
4b72b77
allows to subclass INFO SCHEMA query generation and uses specialized …
rudolfix Jun 12, 2024
ab39e06
uses correct schema escaping function in sql jobs
rudolfix Jun 12, 2024
2ae3ad2
passes pipeline state to package state via extract
rudolfix Jun 12, 2024
09b7731
fixes optional normalizers module
rudolfix Jun 12, 2024
cfd3e5f
excludes version_hash from pipeline state SELECT
rudolfix Jun 12, 2024
0edbbfd
passes pipeline state to package state pt.2
rudolfix Jun 12, 2024
5769ba1
re-enables sentry tests
rudolfix Jun 12, 2024
1f17a44
bumps qdrant client, makes test running for local version
rudolfix Jun 12, 2024
71e418b
makes weaviate running
rudolfix Jun 12, 2024
ce414e1
uses schemata to find databases on athena
rudolfix Jun 13, 2024
bde61a9
uses api get_table for hidden dataset on bigquery to reflect schemas,…
rudolfix Jun 13, 2024
036e3dd
adds naming conventions to two restore state tests
rudolfix Jun 13, 2024
8546763
fixes escape identifiers to column escape
rudolfix Jun 13, 2024
f57e286
fix conflicts in docs
rudolfix Jun 13, 2024
cf50bd4
adjusts capabilities in capabilities() method, uses config and naming…
rudolfix Jun 15, 2024
72969ce
allows to add props to classes without vectorizer in weaviate
rudolfix Jun 15, 2024
656d5fc
moves caps function into factories, cleansup adapters and custom dest…
rudolfix Jun 15, 2024
bbd7fe6
sentry_dsn
rudolfix Jun 15, 2024
a671508
adds basic destination reference tests
rudolfix Jun 15, 2024
81e0db9
fixes table builder tests
rudolfix Jun 15, 2024
8a32793
fix deps and docs
rudolfix Jun 15, 2024
0dc6dc8
fixes more tests
rudolfix Jun 16, 2024
4a39795
case sensitivity docs stubs
rudolfix Jun 17, 2024
43d6d5f
fixes drop_pipeline fixture
rudolfix Jun 17, 2024
e3d998c
improves partial config generation for capabilities
rudolfix Jun 17, 2024
3aef3fd
adds snowflake csv support
rudolfix Jun 17, 2024
6df7a34
creates separate csv tests
rudolfix Jun 17, 2024
57aec2e
allows to import files into extract storage, adds import file writer …
rudolfix Jun 19, 2024
fee7af5
handles ImportFileMeta in extractor
rudolfix Jun 19, 2024
96c7222
adds import file item normalizer and router to normalize
rudolfix Jun 19, 2024
116add0
supports csv format config for snowflake
rudolfix Jun 19, 2024
42eacaf
removes realpath wherever possible and adds fast make_full_path to Fi…
rudolfix Jun 20, 2024
3793d06
adds additional methods to load_package storage to make listings faster
rudolfix Jun 20, 2024
88eec9c
adds file_format to dlt.resource, uses preferred file format for dlt …
rudolfix Jun 21, 2024
8e0f0a8
docs for importing files, file_format
rudolfix Jun 21, 2024
b1c095c
code improvements and tests
rudolfix Jun 21, 2024
46ec732
docs hard links note
rudolfix Jun 21, 2024
2194b18
Merge pull request #1479 from dlt-hub/feat/snowflake-csv-support
rudolfix Jun 21, 2024
1384ed3
Merge branch 'devel' into rfix/allows-naming-conventions
rudolfix Jun 21, 2024
b00cbb2
moves loader parallelism test to pipeliens, solves duckdb ci test err…
rudolfix Jun 23, 2024
a530345
fixes tests
rudolfix Jun 23, 2024
4271895
moves drop_pipeline fixture level up
rudolfix Jun 23, 2024
abd02df
drops default naming convention from caps so naming in saved schema p…
rudolfix Jun 24, 2024
14b4b0e
unifies all representations of pipeline state
rudolfix Jun 24, 2024
60e45b1
tries to decompress text file first in fs_client
rudolfix Jun 24, 2024
a84be2a
tests get stored state in test_job_client
rudolfix Jun 24, 2024
1dc7a09
removes credentials from dlt.attach, addes destination and staging fa…
rudolfix Jun 24, 2024
ab69b76
cleans up env variables and pipeline dropping fixutere precedence
rudolfix Jun 24, 2024
0eeb21d
Merge branch 'devel' into rfix/allows-naming-conventions
rudolfix Jun 24, 2024
f1097d8
removes dev_mode from dlt.attach
rudolfix Jun 24, 2024
3855fcc
adds missing arguments to filesystem factory
rudolfix Jun 24, 2024
651412e
fixes tests
rudolfix Jun 24, 2024
aab36e1
updates destination and naming convention docs
rudolfix Jun 25, 2024
7294aae
removes is_case_sensitive from naming convention initializer
rudolfix Jun 26, 2024
dc10473
simplifies with_file_import mark
rudolfix Jun 26, 2024
727a35e
adds case sensitivity tests
rudolfix Jun 26, 2024
4cb2646
uses dev_mode everywhere
rudolfix Jun 26, 2024
f098e5a
improves csv docs
rudolfix Jun 26, 2024
1521778
fixes encodings in fsspec
rudolfix Jun 26, 2024
796483e
improves naming convention docs
rudolfix Jun 26, 2024
534c7f8
fixes tests and renames clash to collision
rudolfix Jun 26, 2024
5f4cb4c
fixes getting original bases from instance
rudolfix Jun 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dlt/common/data_writers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
DataWriterMetrics,
TDataItemFormat,
FileWriterSpec,
create_import_spec,
resolve_best_writer_spec,
get_best_writer_spec,
is_native_writer,
Expand All @@ -11,12 +12,13 @@
from dlt.common.data_writers.escape import (
escape_redshift_literal,
escape_redshift_identifier,
escape_bigquery_identifier,
escape_hive_identifier,
)

__all__ = [
"DataWriter",
"FileWriterSpec",
"create_import_spec",
"resolve_best_writer_spec",
"get_best_writer_spec",
"is_native_writer",
Expand All @@ -26,5 +28,5 @@
"new_file_id",
"escape_redshift_literal",
"escape_redshift_identifier",
"escape_bigquery_identifier",
"escape_hive_identifier",
]
33 changes: 29 additions & 4 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import gzip
import time
from typing import ClassVar, List, IO, Any, Optional, Type, Generic
import contextlib
from typing import ClassVar, Iterator, List, IO, Any, Optional, Type, Generic

from dlt.common.typing import TDataItem, TDataItems
from dlt.common.data_writers.exceptions import (
BufferedDataWriterClosed,
DestinationCapabilitiesRequired,
FileImportNotFound,
InvalidFileNameTemplateException,
)
from dlt.common.data_writers.writers import TWriter, DataWriter, DataWriterMetrics, FileWriterSpec
Expand Down Expand Up @@ -138,18 +140,31 @@ def write_empty_file(self, columns: TTableSchemaColumns) -> DataWriterMetrics:
self._last_modified = time.time()
return self._rotate_file(allow_empty_file=True)

def import_file(self, file_path: str, metrics: DataWriterMetrics) -> DataWriterMetrics:
def import_file(
self, file_path: str, metrics: DataWriterMetrics, with_extension: str = None
) -> DataWriterMetrics:
"""Import a file from `file_path` into items storage under a new file name. Does not check
the imported file format. Uses counts from `metrics` as a base. Logically closes the imported file
The preferred import method is a hard link to avoid copying the data. If current filesystem does not
support it, a regular copy is used.
Alternative extension may be provided via `with_extension` so various file formats may be imported into the same folder.
"""
# TODO: we should separate file storage from other storages. this creates circular deps
from dlt.common.storages import FileStorage

self._rotate_file()
FileStorage.link_hard_with_fallback(file_path, self._file_name)
# import file with alternative extension
spec = self.writer_spec
if with_extension:
spec = self.writer_spec._replace(file_extension=with_extension)
with self.alternative_spec(spec):
self._rotate_file()
try:
FileStorage.link_hard_with_fallback(file_path, self._file_name)
except FileNotFoundError as f_ex:
raise FileImportNotFound(file_path, self._file_name) from f_ex

self._last_modified = time.time()
metrics = metrics._replace(
file_path=self._file_name,
Expand All @@ -176,6 +191,16 @@ def close(self, skip_flush: bool = False) -> None:
def closed(self) -> bool:
return self._closed

@contextlib.contextmanager
def alternative_spec(self, spec: FileWriterSpec) -> Iterator[FileWriterSpec]:
"""Temporarily changes the writer spec ie. for the moment file is rotated"""
old_spec = self.writer_spec
try:
self.writer_spec = spec
yield spec
finally:
self.writer_spec = old_spec

def __enter__(self) -> "BufferedDataWriter[TWriter]":
return self

Expand Down
31 changes: 31 additions & 0 deletions dlt/common/data_writers/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import ClassVar, Literal, Optional
from dlt.common.configuration import configspec, known_sections
from dlt.common.configuration.specs import BaseConfiguration

CsvQuoting = Literal["quote_all", "quote_needed"]


@configspec
class CsvFormatConfiguration(BaseConfiguration):
delimiter: str = ","
include_header: bool = True
quoting: CsvQuoting = "quote_needed"

# read options
on_error_continue: bool = False
encoding: str = "utf-8"

__section__: ClassVar[str] = known_sections.DATA_WRITER


@configspec
class ParquetFormatConfiguration(BaseConfiguration):
flavor: Optional[str] = None # could be ie. "spark"
version: Optional[str] = "2.4"
data_page_size: Optional[int] = None
timestamp_timezone: str = "UTC"
row_group_size: Optional[int] = None
coerce_timestamps: Optional[Literal["s", "ms", "us", "ns"]] = None
allow_truncated_timestamps: bool = False

__section__: ClassVar[str] = known_sections.DATA_WRITER
6 changes: 3 additions & 3 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,18 @@ def escape_redshift_identifier(v: str) -> str:
escape_dremio_identifier = escape_postgres_identifier


def escape_bigquery_identifier(v: str) -> str:
def escape_hive_identifier(v: str) -> str:
# https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical
return "`" + v.replace("\\", "\\\\").replace("`", "\\`") + "`"


def escape_snowflake_identifier(v: str) -> str:
# Snowcase uppercase all identifiers unless quoted. Match this here so queries on information schema work without issue
# See also https://docs.snowflake.com/en/sql-reference/identifiers-syntax#double-quoted-identifiers
return escape_postgres_identifier(v.upper())
return escape_postgres_identifier(v)


escape_databricks_identifier = escape_bigquery_identifier
escape_databricks_identifier = escape_hive_identifier


DATABRICKS_ESCAPE_DICT = {"'": "\\'", "\\": "\\\\", "\n": "\\n", "\r": "\\r"}
Expand Down
10 changes: 10 additions & 0 deletions dlt/common/data_writers/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ def __init__(self, file_name: str):
super().__init__(f"Writer with recent file name {file_name} is already closed")


class FileImportNotFound(DataWriterException, FileNotFoundError):
def __init__(self, import_file_path: str, local_file_path: str) -> None:
self.import_file_path = import_file_path
self.local_file_path = local_file_path
super().__init__(
f"Attempt to import non existing file {import_file_path} into extract storage file"
f" {local_file_path}"
)


class DestinationCapabilitiesRequired(DataWriterException, ValueError):
def __init__(self, file_format: TLoaderFileFormat):
self.file_format = file_format
Expand Down
80 changes: 47 additions & 33 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
IO,
TYPE_CHECKING,
Any,
ClassVar,
Dict,
List,
Literal,
Expand All @@ -17,24 +16,33 @@
)

from dlt.common.json import json
from dlt.common.configuration import configspec, known_sections, with_config
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.configuration import with_config
from dlt.common.data_writers.exceptions import (
SpecLookupFailed,
DataWriterNotFound,
FileFormatForItemFormatNotFound,
FileSpecNotFound,
InvalidDataItem,
)
from dlt.common.destination import DestinationCapabilitiesContext, TLoaderFileFormat
from dlt.common.data_writers.configuration import (
CsvFormatConfiguration,
CsvQuoting,
ParquetFormatConfiguration,
)
from dlt.common.destination import (
DestinationCapabilitiesContext,
TLoaderFileFormat,
ALL_SUPPORTED_FILE_FORMATS,
)
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import StrAny


if TYPE_CHECKING:
from dlt.common.libs.pyarrow import pyarrow as pa


TDataItemFormat = Literal["arrow", "object"]
TDataItemFormat = Literal["arrow", "object", "file"]
TWriter = TypeVar("TWriter", bound="DataWriter")


Expand Down Expand Up @@ -124,6 +132,9 @@ def item_format_from_file_extension(cls, extension: str) -> TDataItemFormat:
return "object"
elif extension == "parquet":
return "arrow"
# those files may be imported by normalizer as is
elif extension in ALL_SUPPORTED_FILE_FORMATS:
return "file"
else:
raise ValueError(f"Cannot figure out data item format for extension {extension}")

Expand All @@ -132,6 +143,8 @@ def writer_class_from_spec(spec: FileWriterSpec) -> Type["DataWriter"]:
try:
return WRITER_SPECS[spec]
except KeyError:
if spec.data_item_format == "file":
return ImportFileWriter
raise FileSpecNotFound(spec.file_format, spec.data_item_format, spec)

@staticmethod
Expand All @@ -147,6 +160,19 @@ def class_factory(
raise FileFormatForItemFormatNotFound(file_format, data_item_format)


class ImportFileWriter(DataWriter):
"""May only import files, fails on any open/write operations"""

def write_header(self, columns_schema: TTableSchemaColumns) -> None:
raise NotImplementedError(
"ImportFileWriter cannot write any files. You have bug in your code."
)

@classmethod
def writer_spec(cls) -> FileWriterSpec:
raise NotImplementedError("ImportFileWriter has no single spec")


class JsonlWriter(DataWriter):
def write_data(self, rows: Sequence[Any]) -> None:
super().write_data(rows)
Expand Down Expand Up @@ -260,21 +286,8 @@ def writer_spec(cls) -> FileWriterSpec:
)


@configspec
class ParquetDataWriterConfiguration(BaseConfiguration):
flavor: Optional[str] = None # could be ie. "spark"
version: Optional[str] = "2.4"
data_page_size: Optional[int] = None
timestamp_timezone: str = "UTC"
row_group_size: Optional[int] = None
coerce_timestamps: Optional[Literal["s", "ms", "us", "ns"]] = None
allow_truncated_timestamps: bool = False

__section__: ClassVar[str] = known_sections.DATA_WRITER


class ParquetDataWriter(DataWriter):
@with_config(spec=ParquetDataWriterConfiguration)
@with_config(spec=ParquetFormatConfiguration)
def __init__(
self,
f: IO[Any],
Expand Down Expand Up @@ -381,20 +394,8 @@ def writer_spec(cls) -> FileWriterSpec:
)


CsvQuoting = Literal["quote_all", "quote_needed"]


@configspec
class CsvDataWriterConfiguration(BaseConfiguration):
delimiter: str = ","
include_header: bool = True
quoting: CsvQuoting = "quote_needed"

__section__: ClassVar[str] = known_sections.DATA_WRITER


class CsvWriter(DataWriter):
@with_config(spec=CsvDataWriterConfiguration)
@with_config(spec=CsvFormatConfiguration)
def __init__(
self,
f: IO[Any],
Expand Down Expand Up @@ -525,7 +526,7 @@ def writer_spec(cls) -> FileWriterSpec:


class ArrowToCsvWriter(DataWriter):
@with_config(spec=CsvDataWriterConfiguration)
@with_config(spec=CsvFormatConfiguration)
def __init__(
self,
f: IO[Any],
Expand Down Expand Up @@ -783,3 +784,16 @@ def get_best_writer_spec(
return DataWriter.class_factory(file_format, item_format, native_writers).writer_spec()
except DataWriterNotFound:
return DataWriter.class_factory(file_format, item_format, ALL_WRITERS).writer_spec()


def create_import_spec(
item_file_format: TLoaderFileFormat,
possible_file_formats: Sequence[TLoaderFileFormat],
) -> FileWriterSpec:
"""Creates writer spec that may be used only to import files"""
# can the item file be directly imported?
if item_file_format not in possible_file_formats:
raise SpecLookupFailed("file", possible_file_formats, item_file_format)

spec = DataWriter.class_factory(item_file_format, "object", ALL_WRITERS).writer_spec()
return spec._replace(data_item_format="file")
Loading
Loading