Skip to content

Commit

Permalink
allows naming conventions to be changed (#998)
Browse files Browse the repository at this point in the history
* allows to decorate async function with dlt.source

* adds pytest-async and updates pytest to 7.x

* fixes forked teardown issue 7.x

* bumps deps for py 3.12

* adds py 12 common tests

* fixes typings after deps bump

* bumps airflow, yanks duckdb to 0.9.2

* fixes tests

* fixes pandas version

* adds 3.12 duckdb dep

* adds right hand pipe operator

* fixes docker ci build

* adds docs on async sources and resources

* normalizes default hints and preferred types in schema

* defines pipeline state table in utils, column normalization in simple regex

* normalizes all identifiers used by relational normalizer, fixes other modules

* fixes sql job client to use normalized identifiers in queries

* runs state sync tests for lower and upper case naming conventions

* fixes weaviate to use normalized identifiers in queries

* partially fixes qdrant incorrect state and version retrieval queries

* initial sql uppercase naming convention

* adds native df readers to databricks and bigquery

* adds casing identifier capability to support different casing in naming conventions, fixes how identifiers are normalized in destinations

* cleans typing for relational normalizer

* renames escape functions

* destination capabilities for case fold and case sensitivity

* drops supports naming module and allows naming to be instance in config and schema

* checks all tables in information schema in one go, observes case folding and sensitivity in sql destinations

* moves schema verification to destination utils

* adds method to remove processing hints from schema, helper functions for schema settings, refactor, tests

* accepts naming convention instances when resolving configs

* fixes the cloning of schema in decorator, removes processing hints

* removes processing hints when saving imported schema

* adds docs on naming conventions, removes technical docs

* adds casing info to databrick caps, makes caps an instance attr

* adjusts destination casing in caps from schema naming and config

* raises detailed schema identifier clash exceptions

* adds is_case_sensitive and name to NamingConvention

* adds sanity check if _dlt prefix is preserved

* finds genric types in non generic classes deriving from generic

* uses casefold INSERT VALUES job column names

* adds a method make_qualified_table_name_path that calculates components of fully qualified table name and uses it to query INFO SCHEMA

* adds casing info to destinations, caps as instance attrs, custom table name paths

* adds naming convention to restore state tests, make them essential

* fixes table builder tests

* removes processing hints when exporting schema to import folder, warns on schema import overriding local schema, warns on processing hints present

* allows to subclass INFO SCHEMA query generation and uses specialized big query override

* uses correct schema escaping function in sql jobs

* passes pipeline state to package state via extract

* fixes optional normalizers module

* excludes version_hash from pipeline state SELECT

* passes pipeline state to package state pt.2

* re-enables sentry tests

* bumps qdrant client, makes test running for local version

* makes weaviate running

* uses schemata to find databases on athena

* uses api get_table for hidden dataset on bigquery to reflect schemas, support case insensitive datasets

* adds naming conventions to two restore state tests

* fixes escape identifiers to column escape

* fix conflicts in docs

* adjusts capabilities in capabilities() method, uses config and naming optionally

* allows to add props to classes without vectorizer in weaviate

* moves caps function into factories, cleansup adapters and custom destination

* sentry_dsn

* adds basic destination reference tests

* fixes table builder tests

* fix deps and docs

* fixes more tests

* case sensitivity docs stubs

* fixes drop_pipeline fixture

* improves partial config generation for capabilities

* adds snowflake csv support

* creates separate csv tests

* allows to import files into extract storage, adds import file writer and spec

* handles ImportFileMeta in extractor

* adds import file item normalizer and router to normalize

* supports csv format config for snowflake

* removes realpath wherever possible and adds fast make_full_path to FileStorage

* adds additional methods to load_package storage to make listings faster

* adds file_format to dlt.resource, uses preferred file format for dlt state table

* docs for importing files, file_format

* code improvements and tests

* docs hard links note

* moves loader parallelism test to pipeliens, solves duckdb ci test error issue

* fixes tests

* moves drop_pipeline fixture level up

* drops default naming convention from caps so naming in saved schema persists, allows (section, <schema_name>, schema) config section for schema settings

* unifies all representations of pipeline state

* tries to decompress text file first in fs_client

* tests get stored state in test_job_client

* removes credentials from dlt.attach, addes destination and staging factories

* cleans up env variables and pipeline dropping fixutere precedence

* removes dev_mode from dlt.attach

* adds missing arguments to filesystem factory

* fixes tests

* updates destination and naming convention docs

* removes is_case_sensitive from naming convention initializer

* simplifies with_file_import mark

* adds case sensitivity tests

* uses dev_mode everywhere

* improves csv docs

* fixes encodings in fsspec

* improves naming convention docs

* fixes tests and renames clash to collision

* fixes getting original bases from instance
  • Loading branch information
rudolfix authored Jun 26, 2024
1 parent 3d009dc commit b76f8f4
Show file tree
Hide file tree
Showing 282 changed files with 7,924 additions and 4,060 deletions.
6 changes: 4 additions & 2 deletions dlt/common/data_writers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
DataWriterMetrics,
TDataItemFormat,
FileWriterSpec,
create_import_spec,
resolve_best_writer_spec,
get_best_writer_spec,
is_native_writer,
Expand All @@ -11,12 +12,13 @@
from dlt.common.data_writers.escape import (
escape_redshift_literal,
escape_redshift_identifier,
escape_bigquery_identifier,
escape_hive_identifier,
)

__all__ = [
"DataWriter",
"FileWriterSpec",
"create_import_spec",
"resolve_best_writer_spec",
"get_best_writer_spec",
"is_native_writer",
Expand All @@ -26,5 +28,5 @@
"new_file_id",
"escape_redshift_literal",
"escape_redshift_identifier",
"escape_bigquery_identifier",
"escape_hive_identifier",
]
33 changes: 29 additions & 4 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import gzip
import time
from typing import ClassVar, List, IO, Any, Optional, Type, Generic
import contextlib
from typing import ClassVar, Iterator, List, IO, Any, Optional, Type, Generic

from dlt.common.typing import TDataItem, TDataItems
from dlt.common.data_writers.exceptions import (
BufferedDataWriterClosed,
DestinationCapabilitiesRequired,
FileImportNotFound,
InvalidFileNameTemplateException,
)
from dlt.common.data_writers.writers import TWriter, DataWriter, DataWriterMetrics, FileWriterSpec
Expand Down Expand Up @@ -138,18 +140,31 @@ def write_empty_file(self, columns: TTableSchemaColumns) -> DataWriterMetrics:
self._last_modified = time.time()
return self._rotate_file(allow_empty_file=True)

def import_file(self, file_path: str, metrics: DataWriterMetrics) -> DataWriterMetrics:
def import_file(
self, file_path: str, metrics: DataWriterMetrics, with_extension: str = None
) -> DataWriterMetrics:
"""Import a file from `file_path` into items storage under a new file name. Does not check
the imported file format. Uses counts from `metrics` as a base. Logically closes the imported file
The preferred import method is a hard link to avoid copying the data. If current filesystem does not
support it, a regular copy is used.
Alternative extension may be provided via `with_extension` so various file formats may be imported into the same folder.
"""
# TODO: we should separate file storage from other storages. this creates circular deps
from dlt.common.storages import FileStorage

self._rotate_file()
FileStorage.link_hard_with_fallback(file_path, self._file_name)
# import file with alternative extension
spec = self.writer_spec
if with_extension:
spec = self.writer_spec._replace(file_extension=with_extension)
with self.alternative_spec(spec):
self._rotate_file()
try:
FileStorage.link_hard_with_fallback(file_path, self._file_name)
except FileNotFoundError as f_ex:
raise FileImportNotFound(file_path, self._file_name) from f_ex

self._last_modified = time.time()
metrics = metrics._replace(
file_path=self._file_name,
Expand All @@ -176,6 +191,16 @@ def close(self, skip_flush: bool = False) -> None:
def closed(self) -> bool:
return self._closed

@contextlib.contextmanager
def alternative_spec(self, spec: FileWriterSpec) -> Iterator[FileWriterSpec]:
"""Temporarily changes the writer spec ie. for the moment file is rotated"""
old_spec = self.writer_spec
try:
self.writer_spec = spec
yield spec
finally:
self.writer_spec = old_spec

def __enter__(self) -> "BufferedDataWriter[TWriter]":
return self

Expand Down
31 changes: 31 additions & 0 deletions dlt/common/data_writers/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import ClassVar, Literal, Optional
from dlt.common.configuration import configspec, known_sections
from dlt.common.configuration.specs import BaseConfiguration

CsvQuoting = Literal["quote_all", "quote_needed"]


@configspec
class CsvFormatConfiguration(BaseConfiguration):
delimiter: str = ","
include_header: bool = True
quoting: CsvQuoting = "quote_needed"

# read options
on_error_continue: bool = False
encoding: str = "utf-8"

__section__: ClassVar[str] = known_sections.DATA_WRITER


@configspec
class ParquetFormatConfiguration(BaseConfiguration):
flavor: Optional[str] = None # could be ie. "spark"
version: Optional[str] = "2.4"
data_page_size: Optional[int] = None
timestamp_timezone: str = "UTC"
row_group_size: Optional[int] = None
coerce_timestamps: Optional[Literal["s", "ms", "us", "ns"]] = None
allow_truncated_timestamps: bool = False

__section__: ClassVar[str] = known_sections.DATA_WRITER
6 changes: 3 additions & 3 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,18 @@ def escape_redshift_identifier(v: str) -> str:
escape_dremio_identifier = escape_postgres_identifier


def escape_bigquery_identifier(v: str) -> str:
def escape_hive_identifier(v: str) -> str:
# https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical
return "`" + v.replace("\\", "\\\\").replace("`", "\\`") + "`"


def escape_snowflake_identifier(v: str) -> str:
# Snowcase uppercase all identifiers unless quoted. Match this here so queries on information schema work without issue
# See also https://docs.snowflake.com/en/sql-reference/identifiers-syntax#double-quoted-identifiers
return escape_postgres_identifier(v.upper())
return escape_postgres_identifier(v)


escape_databricks_identifier = escape_bigquery_identifier
escape_databricks_identifier = escape_hive_identifier


DATABRICKS_ESCAPE_DICT = {"'": "\\'", "\\": "\\\\", "\n": "\\n", "\r": "\\r"}
Expand Down
10 changes: 10 additions & 0 deletions dlt/common/data_writers/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ def __init__(self, file_name: str):
super().__init__(f"Writer with recent file name {file_name} is already closed")


class FileImportNotFound(DataWriterException, FileNotFoundError):
def __init__(self, import_file_path: str, local_file_path: str) -> None:
self.import_file_path = import_file_path
self.local_file_path = local_file_path
super().__init__(
f"Attempt to import non existing file {import_file_path} into extract storage file"
f" {local_file_path}"
)


class DestinationCapabilitiesRequired(DataWriterException, ValueError):
def __init__(self, file_format: TLoaderFileFormat):
self.file_format = file_format
Expand Down
80 changes: 47 additions & 33 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
IO,
TYPE_CHECKING,
Any,
ClassVar,
Dict,
List,
Literal,
Expand All @@ -17,24 +16,33 @@
)

from dlt.common.json import json
from dlt.common.configuration import configspec, known_sections, with_config
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.configuration import with_config
from dlt.common.data_writers.exceptions import (
SpecLookupFailed,
DataWriterNotFound,
FileFormatForItemFormatNotFound,
FileSpecNotFound,
InvalidDataItem,
)
from dlt.common.destination import DestinationCapabilitiesContext, TLoaderFileFormat
from dlt.common.data_writers.configuration import (
CsvFormatConfiguration,
CsvQuoting,
ParquetFormatConfiguration,
)
from dlt.common.destination import (
DestinationCapabilitiesContext,
TLoaderFileFormat,
ALL_SUPPORTED_FILE_FORMATS,
)
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.common.typing import StrAny


if TYPE_CHECKING:
from dlt.common.libs.pyarrow import pyarrow as pa


TDataItemFormat = Literal["arrow", "object"]
TDataItemFormat = Literal["arrow", "object", "file"]
TWriter = TypeVar("TWriter", bound="DataWriter")


Expand Down Expand Up @@ -124,6 +132,9 @@ def item_format_from_file_extension(cls, extension: str) -> TDataItemFormat:
return "object"
elif extension == "parquet":
return "arrow"
# those files may be imported by normalizer as is
elif extension in ALL_SUPPORTED_FILE_FORMATS:
return "file"
else:
raise ValueError(f"Cannot figure out data item format for extension {extension}")

Expand All @@ -132,6 +143,8 @@ def writer_class_from_spec(spec: FileWriterSpec) -> Type["DataWriter"]:
try:
return WRITER_SPECS[spec]
except KeyError:
if spec.data_item_format == "file":
return ImportFileWriter
raise FileSpecNotFound(spec.file_format, spec.data_item_format, spec)

@staticmethod
Expand All @@ -147,6 +160,19 @@ def class_factory(
raise FileFormatForItemFormatNotFound(file_format, data_item_format)


class ImportFileWriter(DataWriter):
"""May only import files, fails on any open/write operations"""

def write_header(self, columns_schema: TTableSchemaColumns) -> None:
raise NotImplementedError(
"ImportFileWriter cannot write any files. You have bug in your code."
)

@classmethod
def writer_spec(cls) -> FileWriterSpec:
raise NotImplementedError("ImportFileWriter has no single spec")


class JsonlWriter(DataWriter):
def write_data(self, rows: Sequence[Any]) -> None:
super().write_data(rows)
Expand Down Expand Up @@ -260,21 +286,8 @@ def writer_spec(cls) -> FileWriterSpec:
)


@configspec
class ParquetDataWriterConfiguration(BaseConfiguration):
flavor: Optional[str] = None # could be ie. "spark"
version: Optional[str] = "2.4"
data_page_size: Optional[int] = None
timestamp_timezone: str = "UTC"
row_group_size: Optional[int] = None
coerce_timestamps: Optional[Literal["s", "ms", "us", "ns"]] = None
allow_truncated_timestamps: bool = False

__section__: ClassVar[str] = known_sections.DATA_WRITER


class ParquetDataWriter(DataWriter):
@with_config(spec=ParquetDataWriterConfiguration)
@with_config(spec=ParquetFormatConfiguration)
def __init__(
self,
f: IO[Any],
Expand Down Expand Up @@ -381,20 +394,8 @@ def writer_spec(cls) -> FileWriterSpec:
)


CsvQuoting = Literal["quote_all", "quote_needed"]


@configspec
class CsvDataWriterConfiguration(BaseConfiguration):
delimiter: str = ","
include_header: bool = True
quoting: CsvQuoting = "quote_needed"

__section__: ClassVar[str] = known_sections.DATA_WRITER


class CsvWriter(DataWriter):
@with_config(spec=CsvDataWriterConfiguration)
@with_config(spec=CsvFormatConfiguration)
def __init__(
self,
f: IO[Any],
Expand Down Expand Up @@ -525,7 +526,7 @@ def writer_spec(cls) -> FileWriterSpec:


class ArrowToCsvWriter(DataWriter):
@with_config(spec=CsvDataWriterConfiguration)
@with_config(spec=CsvFormatConfiguration)
def __init__(
self,
f: IO[Any],
Expand Down Expand Up @@ -783,3 +784,16 @@ def get_best_writer_spec(
return DataWriter.class_factory(file_format, item_format, native_writers).writer_spec()
except DataWriterNotFound:
return DataWriter.class_factory(file_format, item_format, ALL_WRITERS).writer_spec()


def create_import_spec(
item_file_format: TLoaderFileFormat,
possible_file_formats: Sequence[TLoaderFileFormat],
) -> FileWriterSpec:
"""Creates writer spec that may be used only to import files"""
# can the item file be directly imported?
if item_file_format not in possible_file_formats:
raise SpecLookupFailed("file", possible_file_formats, item_file_format)

spec = DataWriter.class_factory(item_file_format, "object", ALL_WRITERS).writer_spec()
return spec._replace(data_item_format="file")
Loading

0 comments on commit b76f8f4

Please sign in to comment.