Skip to content

Commit

Permalink
Merge branch 'devel' into d#/process_docs
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Mar 21, 2024
2 parents 1b1aee5 + 1f2b4ce commit bd31bdf
Show file tree
Hide file tree
Showing 52 changed files with 5,403 additions and 4,580 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ jobs:
name: Run smoke tests with minimum deps Windows
shell: cmd
- name: Install pyarrow
run: poetry install --no-interaction -E duckdb -E cli -E parquet --with sentry-sdk

- run: |
poetry run pytest tests/pipeline/test_pipeline_extra.py -k arrow
if: runner.os != 'Windows'
name: Run pipeline tests with pyarrow but no pandas installed
- run: |
poetry run pytest tests/pipeline/test_pipeline_extra.py -k arrow
if: runner.os == 'Windows'
name: Run pipeline tests with pyarrow but no pandas installed Windows
shell: cmd
- name: Install pipeline dependencies
run: poetry install --no-interaction -E duckdb -E cli -E parquet --with sentry-sdk --with pipeline

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_doc_snippets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet -E qdrant --with docs,sentry-sdk --without airflow
run: poetry install --no-interaction -E duckdb -E weaviate -E parquet -E qdrant -E bigquery --with docs,sentry-sdk --without airflow

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > docs/website/docs/.dlt/secrets.toml
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ dev: has-poetry
poetry install --all-extras --with airflow --with docs --with providers --with pipeline --with sentry-sdk

lint:
./check-package.sh
./tools/check-package.sh
poetry run python ./tools/check-lockfile.py
poetry run mypy --config-file mypy.ini dlt tests
poetry run flake8 --max-line-length=200 dlt
poetry run flake8 --max-line-length=200 tests --exclude tests/reflection/module_cases
Expand Down
14 changes: 10 additions & 4 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,25 @@ def with_config(
def decorator(f: TFun) -> TFun:
SPEC: Type[BaseConfiguration] = None
sig: Signature = inspect.signature(f)
signature_fields: Dict[str, Any]
kwargs_arg = next(
(p for p in sig.parameters.values() if p.kind == Parameter.VAR_KEYWORD), None
)
spec_arg: Parameter = None
pipeline_name_arg: Parameter = None
if spec is None:
SPEC = spec_from_signature(f, sig, include_defaults, base=base)
SPEC, signature_fields = spec_from_signature(f, sig, include_defaults, base=base)
else:
SPEC = spec
signature_fields = SPEC.get_resolvable_fields()

if SPEC is None:
# if no signature fields were added we will not wrap `f` for injection
if len(signature_fields) == 0:
# always register new function
_FUNC_SPECS[id(f)] = SPEC
return f

spec_arg: Parameter = None
pipeline_name_arg: Parameter = None

for p in sig.parameters.values():
# for all positional parameters that do not have default value, set default
# if hasattr(SPEC, p.name) and p.default == Parameter.empty:
Expand Down
1 change: 1 addition & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
insert_values_writer_type: str = "default"
supports_multiple_statements: bool = True
supports_clone_table: bool = False
max_table_nesting: Optional[int] = None # destination can overwrite max table nesting
"""Destination supports CREATE TABLE ... CLONE ... statements"""

# do not allow to create default value, destination caps must be always explicitly inserted into container
Expand Down
21 changes: 21 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,27 @@ def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
return []


class DoNothingJob(LoadJob):
"""The most lazy class of dlt"""

def __init__(self, file_path: str) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))

def state(self) -> TLoadJobState:
# this job is always done
return "completed"

def exception(self) -> str:
# this part of code should be never reached
raise NotImplementedError()


class DoNothingFollowupJob(DoNothingJob, FollowupJob):
"""The second most lazy class of dlt"""

pass


class JobClientBase(ABC):
capabilities: ClassVar[DestinationCapabilitiesContext] = None

Expand Down
9 changes: 8 additions & 1 deletion dlt/common/libs/pandas.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from typing import Any
from dlt.common.exceptions import MissingDependencyException

try:
import pandas
from pandas.io.sql import _wrap_result
except ModuleNotFoundError:
raise MissingDependencyException("DLT Pandas Helpers", ["pandas"])


def pandas_to_arrow(df: pandas.DataFrame) -> Any:
"""Converts pandas to arrow or raises an exception if pyarrow is not installed"""
from dlt.common.libs.pyarrow import pyarrow as pa

return pa.Table.from_pandas(df)
7 changes: 7 additions & 0 deletions dlt/common/libs/pandas_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dlt.common.exceptions import MissingDependencyException


try:
from pandas.io.sql import _wrap_result
except ModuleNotFoundError:
raise MissingDependencyException("dlt pandas helper for sql", ["pandas"])
4 changes: 3 additions & 1 deletion dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import pyarrow.compute
except ModuleNotFoundError:
raise MissingDependencyException(
"dlt parquet Helpers", [f"{version.DLT_PKG_NAME}[parquet]"], "dlt Helpers for for parquet."
"dlt pyarrow helpers",
[f"{version.DLT_PKG_NAME}[parquet]"],
"Install pyarrow to be allow to load arrow tables, panda frames and to use parquet files.",
)


Expand Down
14 changes: 12 additions & 2 deletions dlt/common/normalizers/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.normalizers.typing import TJSONNormalizer
from dlt.common.typing import StrAny
from dlt.common.typing import DictStrAny


@configspec
Expand All @@ -14,14 +14,24 @@ class NormalizersConfiguration(BaseConfiguration):
__section__: str = "schema"

naming: Optional[str] = None
json_normalizer: Optional[StrAny] = None
json_normalizer: Optional[DictStrAny] = None
destination_capabilities: Optional[DestinationCapabilitiesContext] = None # injectable

def on_resolved(self) -> None:
# get naming from capabilities if not present
if self.naming is None:
if self.destination_capabilities:
self.naming = self.destination_capabilities.naming_convention
# if max_table_nesting is set, we need to set the max_table_nesting in the json_normalizer
if (
self.destination_capabilities
and self.destination_capabilities.max_table_nesting is not None
):
self.json_normalizer = self.json_normalizer or {}
self.json_normalizer.setdefault("config", {})
self.json_normalizer["config"][
"max_nesting"
] = self.destination_capabilities.max_table_nesting

if TYPE_CHECKING:

Expand Down
8 changes: 5 additions & 3 deletions dlt/common/normalizers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ def import_normalizers(
"""
# add defaults to normalizer_config
normalizers_config["names"] = names = normalizers_config["names"] or "snake_case"
normalizers_config["json"] = item_normalizer = normalizers_config["json"] or {
"module": "dlt.common.normalizers.json.relational"
}
# set default json normalizer module
normalizers_config["json"] = item_normalizer = normalizers_config.get("json") or {}
if "module" not in item_normalizer:
item_normalizer["module"] = "dlt.common.normalizers.json.relational"

try:
if "." in names:
# TODO: bump schema engine version and migrate schema. also change the name in TNormalizersConfig from names to naming
Expand Down
39 changes: 28 additions & 11 deletions dlt/common/reflection/spec.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
import inspect
from typing import Dict, List, Type, Any, Optional, NewType
from typing import Dict, List, Tuple, Type, Any, Optional, NewType
from inspect import Signature, Parameter

from dlt.common.typing import AnyType, AnyFun, TSecretValue
Expand Down Expand Up @@ -30,14 +30,27 @@ def spec_from_signature(
sig: Signature,
include_defaults: bool = True,
base: Type[BaseConfiguration] = BaseConfiguration,
) -> Type[BaseConfiguration]:
) -> Tuple[Type[BaseConfiguration], Dict[str, Any]]:
"""Creates a SPEC on base `base1 for a function `f` with signature `sig`.
All the arguments in `sig` that are valid SPEC hints and have defaults will be part of the SPEC.
Special markers for required SPEC fields `dlt.secrets.value` and `dlt.config.value` are parsed using
module source code, which is a hack and will not work for modules not imported from a file.
The name of a SPEC type is inferred from qualname of `f` and type will refer to `f` module and is unique
for a module. NOTE: the SPECS are cached in the module by using name as an id.
Return value is a tuple of SPEC and SPEC fields created from a `sig`.
"""
name = _get_spec_name_from_f(f)
module = inspect.getmodule(f)
base_fields = base.get_resolvable_fields()

# check if spec for that function exists
spec_id = name # f"SPEC_{name}_kw_only_{kw_only}"
if hasattr(module, spec_id):
return getattr(module, spec_id) # type: ignore
MOD_SPEC: Type[BaseConfiguration] = getattr(module, spec_id)
return MOD_SPEC, MOD_SPEC.get_resolvable_fields()

# find all the arguments that have following defaults
literal_defaults: Dict[str, str] = None
Expand All @@ -62,7 +75,8 @@ def dlt_config_literal_to_type(arg_name: str) -> AnyType:
return None

# synthesize configuration from the signature
fields: Dict[str, Any] = {}
new_fields: Dict[str, Any] = {}
sig_base_fields: Dict[str, Any] = {}
annotations: Dict[str, Any] = {}

for p in sig.parameters.values():
Expand All @@ -72,6 +86,10 @@ def dlt_config_literal_to_type(arg_name: str) -> AnyType:
"cls",
]:
field_type = AnyType if p.annotation == Parameter.empty else p.annotation
# keep the base fields if sig not annotated
if p.name in base_fields and field_type is AnyType and p.default is None:
sig_base_fields[p.name] = base_fields[p.name]
continue
# only valid hints and parameters with defaults are eligible
if is_valid_hint(field_type) and p.default != Parameter.empty:
# try to get type from default
Expand Down Expand Up @@ -102,18 +120,17 @@ def dlt_config_literal_to_type(arg_name: str) -> AnyType:
# set annotations
annotations[p.name] = field_type
# set field with default value
fields[p.name] = p.default
new_fields[p.name] = p.default

if not fields:
return None
signature_fields = {**sig_base_fields, **new_fields}

# new type goes to the module where sig was declared
fields["__module__"] = module.__name__
new_fields["__module__"] = module.__name__
# set annotations so they are present in __dict__
fields["__annotations__"] = annotations
new_fields["__annotations__"] = annotations
# synthesize type
T: Type[BaseConfiguration] = type(name, (base,), fields)
T: Type[BaseConfiguration] = type(name, (base,), new_fields)
SPEC = configspec()(T)
# add to the module
setattr(module, spec_id, SPEC)
return SPEC
return SPEC, signature_fields
49 changes: 44 additions & 5 deletions dlt/destinations/decorators.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import functools

from typing import Any, Type, Optional, Callable, Union
from typing import Any, Type, Optional, Callable, Union, cast
from typing_extensions import Concatenate
from dlt.common.typing import AnyFun

from functools import wraps

from dlt.common import logger
from dlt.destinations.impl.destination.factory import destination as _destination
from dlt.destinations.impl.destination.configuration import (
TDestinationCallableParams,
GenericDestinationClientConfiguration,
CustomDestinationClientConfiguration,
)
from dlt.common.destination import TLoaderFileFormat
from dlt.common.destination.reference import Destination
Expand All @@ -18,16 +19,47 @@


def destination(
*,
func: Optional[AnyFun] = None,
/,
loader_file_format: TLoaderFileFormat = None,
batch_size: int = 10,
name: str = None,
naming_convention: str = "direct",
spec: Type[GenericDestinationClientConfiguration] = GenericDestinationClientConfiguration,
skip_dlt_columns_and_tables: bool = True,
max_table_nesting: int = 0,
spec: Type[CustomDestinationClientConfiguration] = None,
) -> Callable[
[Callable[Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any]],
Callable[TDestinationCallableParams, _destination],
]:
"""A decorator that transforms a function that takes two positional arguments "table" and "items" and any number of keyword arguments with defaults
into a callable that will create a custom destination. The function does not return anything, the keyword arguments can be configuration and secrets values.
#### Example Usage with Configuration and Secrets:
>>> @dlt.destination(batch_size=100, loader_file_format="parquet")
>>> def my_destination(items, table, api_url: str = dlt.config.value, api_secret = dlt.secrets.value):
>>> print(table["name"])
>>> print(items)
>>>
>>> p = dlt.pipeline("chess_pipeline", destination=my_destination)
Here all incoming data will be sent to the destination function with the items in the requested format and the dlt table schema.
The config and secret values will be resolved from the path destination.my_destination.api_url and destination.my_destination.api_secret.
#### Args:
batch_size: defines how many items per function call are batched together and sent as an array. If you set a batch-size of 0, instead of passing in actual dataitems, you will receive one call per load job with the path of the file as the items argument. You can then open and process that file in any way you like.
loader_file_format: defines in which format files are stored in the load package before being sent to the destination function, this can be puae-jsonl or parquet.
name: defines the name of the destination that get's created by the destination decorator, defaults to the name of the function
naming_convention: defines the name of the destination that gets created by the destination decorator. This controls how table and column names are normalized. The default is direct which will keep all names the same.
max_nesting_level: defines how deep the normalizer will go to normalize complex fields on your data to create subtables. This overwrites any settings on your source and is set to zero to not create any nested tables by default.
skip_dlt_columns_and_tables: defines wether internal tables and columns will be fed into the custom destination function. This is set to True by default.
spec: defines a configuration spec that will be used to to inject arguments into the decorated functions. Argument not in spec will not be injected
Returns:
A callable that can be used to create a dlt custom destination instance
"""

def decorator(
destination_callable: Callable[
Concatenate[Union[TDataItems, str], TTableSchema, TDestinationCallableParams], Any
Expand All @@ -49,9 +81,16 @@ def wrapper(
batch_size=batch_size,
destination_name=name,
naming_convention=naming_convention,
skip_dlt_columns_and_tables=skip_dlt_columns_and_tables,
max_table_nesting=max_table_nesting,
**kwargs, # type: ignore
)

return wrapper

return decorator
if func is None:
# we're called with parens.
return decorator

# we're called as @source without parens.
return decorator(func) # type: ignore
23 changes: 1 addition & 22 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from dlt.common.schema.typing import TTableSchema, TColumnType, TWriteDisposition, TTableFormat
from dlt.common.schema.utils import table_schema_has_type, get_table_format
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.reference import LoadJob, FollowupJob
from dlt.common.destination.reference import LoadJob, DoNothingFollowupJob, DoNothingJob
from dlt.common.destination.reference import TLoadJobState, NewLoadJob, SupportsStagingDestination
from dlt.common.storages import FileStorage
from dlt.common.data_writers.escape import escape_bigquery_identifier
Expand Down Expand Up @@ -149,27 +149,6 @@ def __init__(self) -> None:
DLTAthenaFormatter._INSTANCE = self


class DoNothingJob(LoadJob):
"""The most lazy class of dlt"""

def __init__(self, file_path: str) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))

def state(self) -> TLoadJobState:
# this job is always done
return "completed"

def exception(self) -> str:
# this part of code should be never reached
raise NotImplementedError()


class DoNothingFollowupJob(DoNothingJob, FollowupJob):
"""The second most lazy class of dlt"""

pass


class AthenaSQLClient(SqlClientBase[Connection]):
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()
dbapi: ClassVar[DBApi] = pyathena
Expand Down
Loading

0 comments on commit bd31bdf

Please sign in to comment.