Skip to content

Commit

Permalink
Merge branch 'devel' into feat/continuous-load-jobs
Browse files Browse the repository at this point in the history
# Conflicts:
#	dlt/load/load.py
#	tests/load/test_dummy_client.py
  • Loading branch information
sh-rp committed Jun 27, 2024
2 parents 78a5989 + c00d408 commit b4d05c8
Show file tree
Hide file tree
Showing 310 changed files with 9,077 additions and 4,257 deletions.
10 changes: 1 addition & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,12 @@ Be it a Google Colab notebook, AWS Lambda function, an Airflow DAG, your local l

dlt supports Python 3.8+.

**pip:**
```sh
pip install dlt
```

**pixi:**
```sh
pixi add dlt
```
More options: [Install via Conda or Pixi](https://dlthub.com/docs/reference/installation#install-dlt-via-pixi-and-conda)

**conda:**
```sh
conda install -c conda-forge dlt
```

## Quick Start

Expand Down
17 changes: 14 additions & 3 deletions dlt/common/configuration/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,21 @@ def _maybe_parse_native_value(
not isinstance(explicit_value, C_Mapping) or isinstance(explicit_value, BaseConfiguration)
):
try:
# parse the native value anyway because there are configs with side effects
config.parse_native_representation(explicit_value)
default_value = config.__class__()
# parse native value and convert it into dict, extract the diff and use it as exact value
# NOTE: as those are the same dataclasses, the set of keys must be the same
explicit_value = {
k: v
for k, v in config.__class__.from_init_value(explicit_value).items()
if default_value[k] != v
}
except ValueError as v_err:
# provide generic exception
raise InvalidNativeValue(type(config), type(explicit_value), embedded_sections, v_err)
except NotImplementedError:
pass
# explicit value was consumed
explicit_value = None
return explicit_value


Expand Down Expand Up @@ -336,7 +343,11 @@ def _resolve_config_field(
# print(f"{embedded_config} IS RESOLVED with VALUE {value}")
# injected context will be resolved
if value is not None:
_maybe_parse_native_value(embedded_config, value, embedded_sections + (key,))
from_native_explicit = _maybe_parse_native_value(
embedded_config, value, embedded_sections + (key,)
)
if from_native_explicit is not value:
embedded_config.update(from_native_explicit)
value = embedded_config
else:
# only config with sections may look for initial values
Expand Down
57 changes: 29 additions & 28 deletions dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@
# forward class declaration
_F_BaseConfiguration: Any = type(object)
_F_ContainerInjectableContext: Any = type(object)
_T = TypeVar("_T", bound="BaseConfiguration")
_C = TypeVar("_C", bound="CredentialsConfiguration")
_B = TypeVar("_B", bound="BaseConfiguration")


class NotResolved:
Expand Down Expand Up @@ -289,6 +288,33 @@ class BaseConfiguration(MutableMapping[str, Any]):
"""Typing for dataclass fields"""
__hint_resolvers__: ClassVar[Dict[str, Callable[["BaseConfiguration"], Type[Any]]]] = {}

@classmethod
def from_init_value(cls: Type[_B], init_value: Any = None) -> _B:
"""Initializes credentials from `init_value`
Init value may be a native representation of the credentials or a dict. In case of native representation (for example a connection string or JSON with service account credentials)
a `parse_native_representation` method will be used to parse it. In case of a dict, the credentials object will be updated with key: values of the dict.
Unexpected values in the dict will be ignored.
Credentials will be marked as resolved if all required fields are set resolve() method is successful
"""
# create an instance
self = cls()
self._apply_init_value(init_value)
if not self.is_partial():
# let it fail gracefully
with contextlib.suppress(Exception):
self.resolve()
return self

def _apply_init_value(self, init_value: Any = None) -> None:
if isinstance(init_value, C_Mapping):
self.update(init_value)
elif init_value is not None:
self.parse_native_representation(init_value)
else:
return

def parse_native_representation(self, native_value: Any) -> None:
"""Initialize the configuration fields by parsing the `native_value` which should be a native representation of the configuration
or credentials, for example database connection string or JSON serialized GCP service credentials file.
Expand Down Expand Up @@ -348,7 +374,7 @@ def resolve(self) -> None:
self.call_method_in_mro("on_resolved")
self.__is_resolved__ = True

def copy(self: _T) -> _T:
def copy(self: _B) -> _B:
"""Returns a deep copy of the configuration instance"""
return copy.deepcopy(self)

Expand Down Expand Up @@ -426,38 +452,13 @@ class CredentialsConfiguration(BaseConfiguration):

__section__: ClassVar[str] = "credentials"

@classmethod
def from_init_value(cls: Type[_C], init_value: Any = None) -> _C:
"""Initializes credentials from `init_value`
Init value may be a native representation of the credentials or a dict. In case of native representation (for example a connection string or JSON with service account credentials)
a `parse_native_representation` method will be used to parse it. In case of a dict, the credentials object will be updated with key: values of the dict.
Unexpected values in the dict will be ignored.
Credentials will be marked as resolved if all required fields are set.
"""
# create an instance
self = cls()
self._apply_init_value(init_value)
return self

def to_native_credentials(self) -> Any:
"""Returns native credentials object.
By default calls `to_native_representation` method.
"""
return self.to_native_representation()

def _apply_init_value(self, init_value: Any = None) -> None:
if isinstance(init_value, C_Mapping):
self.update(init_value)
elif init_value is not None:
self.parse_native_representation(init_value)
else:
return
if not self.is_partial():
self.resolve()

def __str__(self) -> str:
"""Get string representation of credentials to be displayed, with all secret parts removed"""
return super().__str__()
Expand Down
25 changes: 22 additions & 3 deletions dlt/common/configuration/specs/connection_string_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ConnectionStringCredentials(CredentialsConfiguration):
username: str = None
host: Optional[str] = None
port: Optional[int] = None
query: Optional[Dict[str, str]] = None
query: Optional[Dict[str, Any]] = None

__config_gen_annotations__: ClassVar[List[str]] = ["port", "password", "host"]

Expand Down Expand Up @@ -44,16 +44,35 @@ def on_resolved(self) -> None:
def to_native_representation(self) -> str:
return self.to_url().render_as_string(hide_password=False)

def get_query(self) -> Dict[str, Any]:
"""Gets query preserving parameter types. Mostly used internally to export connection params"""
return {} if self.query is None else self.query

def to_url(self) -> URL:
"""Creates SQLAlchemy compatible URL object, computes current query via `get_query` and serializes its values to str"""
# circular dependencies here
from dlt.common.configuration.utils import serialize_value

def _serialize_value(v_: Any) -> str:
if v_ is None:
return None
return serialize_value(v_)

# query must be str -> str
query = {k: _serialize_value(v) for k, v in self.get_query().items()}
return URL.create(
self.drivername,
self.username,
self.password,
self.host,
self.port,
self.database,
self.query,
query,
)

def __str__(self) -> str:
return self.to_url().render_as_string(hide_password=True)
url = self.to_url()
# do not display query. it often contains secret values
url = url._replace(query=None)
# we only have control over netloc/path
return url.render_as_string(hide_password=True)
2 changes: 1 addition & 1 deletion dlt/common/configuration/specs/run_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class RunConfiguration(BaseConfiguration):
dlthub_telemetry: bool = True # enable or disable dlthub telemetry
dlthub_telemetry_endpoint: Optional[str] = "https://telemetry.scalevector.ai"
dlthub_telemetry_segment_write_key: Optional[str] = None
log_format: str = "{asctime}|[{levelname:<21}]|{process}|{thread}|{name}|{filename}|{funcName}:{lineno}|{message}"
log_format: str = "{asctime}|[{levelname}]|{process}|{thread}|{name}|{filename}|{funcName}:{lineno}|{message}"
log_level: str = "WARNING"
request_timeout: float = 60
"""Timeout for http requests"""
Expand Down
6 changes: 3 additions & 3 deletions dlt/common/configuration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,21 @@ def deserialize_value(key: str, value: Any, hint: Type[TAny]) -> TAny:
raise ConfigValueCannotBeCoercedException(key, value, hint) from exc


def serialize_value(value: Any) -> Any:
def serialize_value(value: Any) -> str:
if value is None:
raise ValueError(value)
# return literal for tuples
if isinstance(value, tuple):
return str(value)
if isinstance(value, BaseConfiguration):
try:
return value.to_native_representation()
return str(value.to_native_representation())
except NotImplementedError:
# no native representation: use dict
value = dict(value)
# coerce type to text which will use json for mapping and sequences
value_dt = py_type_to_sc_type(type(value))
return coerce_value("text", value_dt, value)
return coerce_value("text", value_dt, value) # type: ignore[no-any-return]


def auto_cast(value: str) -> Any:
Expand Down
6 changes: 4 additions & 2 deletions dlt/common/data_writers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
DataWriterMetrics,
TDataItemFormat,
FileWriterSpec,
create_import_spec,
resolve_best_writer_spec,
get_best_writer_spec,
is_native_writer,
Expand All @@ -11,12 +12,13 @@
from dlt.common.data_writers.escape import (
escape_redshift_literal,
escape_redshift_identifier,
escape_bigquery_identifier,
escape_hive_identifier,
)

__all__ = [
"DataWriter",
"FileWriterSpec",
"create_import_spec",
"resolve_best_writer_spec",
"get_best_writer_spec",
"is_native_writer",
Expand All @@ -26,5 +28,5 @@
"new_file_id",
"escape_redshift_literal",
"escape_redshift_identifier",
"escape_bigquery_identifier",
"escape_hive_identifier",
]
33 changes: 29 additions & 4 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import gzip
import time
from typing import ClassVar, List, IO, Any, Optional, Type, Generic
import contextlib
from typing import ClassVar, Iterator, List, IO, Any, Optional, Type, Generic

from dlt.common.typing import TDataItem, TDataItems
from dlt.common.data_writers.exceptions import (
BufferedDataWriterClosed,
DestinationCapabilitiesRequired,
FileImportNotFound,
InvalidFileNameTemplateException,
)
from dlt.common.data_writers.writers import TWriter, DataWriter, DataWriterMetrics, FileWriterSpec
Expand Down Expand Up @@ -138,18 +140,31 @@ def write_empty_file(self, columns: TTableSchemaColumns) -> DataWriterMetrics:
self._last_modified = time.time()
return self._rotate_file(allow_empty_file=True)

def import_file(self, file_path: str, metrics: DataWriterMetrics) -> DataWriterMetrics:
def import_file(
self, file_path: str, metrics: DataWriterMetrics, with_extension: str = None
) -> DataWriterMetrics:
"""Import a file from `file_path` into items storage under a new file name. Does not check
the imported file format. Uses counts from `metrics` as a base. Logically closes the imported file
The preferred import method is a hard link to avoid copying the data. If current filesystem does not
support it, a regular copy is used.
Alternative extension may be provided via `with_extension` so various file formats may be imported into the same folder.
"""
# TODO: we should separate file storage from other storages. this creates circular deps
from dlt.common.storages import FileStorage

self._rotate_file()
FileStorage.link_hard_with_fallback(file_path, self._file_name)
# import file with alternative extension
spec = self.writer_spec
if with_extension:
spec = self.writer_spec._replace(file_extension=with_extension)
with self.alternative_spec(spec):
self._rotate_file()
try:
FileStorage.link_hard_with_fallback(file_path, self._file_name)
except FileNotFoundError as f_ex:
raise FileImportNotFound(file_path, self._file_name) from f_ex

self._last_modified = time.time()
metrics = metrics._replace(
file_path=self._file_name,
Expand All @@ -176,6 +191,16 @@ def close(self, skip_flush: bool = False) -> None:
def closed(self) -> bool:
return self._closed

@contextlib.contextmanager
def alternative_spec(self, spec: FileWriterSpec) -> Iterator[FileWriterSpec]:
"""Temporarily changes the writer spec ie. for the moment file is rotated"""
old_spec = self.writer_spec
try:
self.writer_spec = spec
yield spec
finally:
self.writer_spec = old_spec

def __enter__(self) -> "BufferedDataWriter[TWriter]":
return self

Expand Down
31 changes: 31 additions & 0 deletions dlt/common/data_writers/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import ClassVar, Literal, Optional
from dlt.common.configuration import configspec, known_sections
from dlt.common.configuration.specs import BaseConfiguration

CsvQuoting = Literal["quote_all", "quote_needed"]


@configspec
class CsvFormatConfiguration(BaseConfiguration):
delimiter: str = ","
include_header: bool = True
quoting: CsvQuoting = "quote_needed"

# read options
on_error_continue: bool = False
encoding: str = "utf-8"

__section__: ClassVar[str] = known_sections.DATA_WRITER


@configspec
class ParquetFormatConfiguration(BaseConfiguration):
flavor: Optional[str] = None # could be ie. "spark"
version: Optional[str] = "2.4"
data_page_size: Optional[int] = None
timestamp_timezone: str = "UTC"
row_group_size: Optional[int] = None
coerce_timestamps: Optional[Literal["s", "ms", "us", "ns"]] = None
allow_truncated_timestamps: bool = False

__section__: ClassVar[str] = known_sections.DATA_WRITER
6 changes: 3 additions & 3 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,18 @@ def escape_redshift_identifier(v: str) -> str:
escape_dremio_identifier = escape_postgres_identifier


def escape_bigquery_identifier(v: str) -> str:
def escape_hive_identifier(v: str) -> str:
# https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical
return "`" + v.replace("\\", "\\\\").replace("`", "\\`") + "`"


def escape_snowflake_identifier(v: str) -> str:
# Snowcase uppercase all identifiers unless quoted. Match this here so queries on information schema work without issue
# See also https://docs.snowflake.com/en/sql-reference/identifiers-syntax#double-quoted-identifiers
return escape_postgres_identifier(v.upper())
return escape_postgres_identifier(v)


escape_databricks_identifier = escape_bigquery_identifier
escape_databricks_identifier = escape_hive_identifier


DATABRICKS_ESCAPE_DICT = {"'": "\\'", "\\": "\\\\", "\n": "\\n", "\r": "\\r"}
Expand Down
10 changes: 10 additions & 0 deletions dlt/common/data_writers/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ def __init__(self, file_name: str):
super().__init__(f"Writer with recent file name {file_name} is already closed")


class FileImportNotFound(DataWriterException, FileNotFoundError):
def __init__(self, import_file_path: str, local_file_path: str) -> None:
self.import_file_path = import_file_path
self.local_file_path = local_file_path
super().__init__(
f"Attempt to import non existing file {import_file_path} into extract storage file"
f" {local_file_path}"
)


class DestinationCapabilitiesRequired(DataWriterException, ValueError):
def __init__(self, file_format: TLoaderFileFormat):
self.file_format = file_format
Expand Down
Loading

0 comments on commit b4d05c8

Please sign in to comment.