Skip to content

Commit

Permalink
Merge branch 'devel' into fix/streamlit_bug
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Jun 10, 2024
2 parents 9ca287c + 3609757 commit a00cb0a
Show file tree
Hide file tree
Showing 147 changed files with 4,992 additions and 1,427 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ jobs:
shell: cmd
- name: Install pipeline dependencies
run: poetry install --no-interaction -E duckdb -E cli -E parquet --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E duckdb -E cli -E parquet --with sentry-sdk --with pipeline -E deltalake

- run: |
poetry run pytest tests/extract tests/pipeline tests/libs tests/cli/common tests/destinations
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/test_destination_qdrant.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ jobs:
run_loader:
name: dest | qdrant tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
# if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
if: false # TODO re-enable with above line
defaults:
run:
shell: bash
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ env:
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
# Test redshift and filesystem with all buckets
# postgres runs again here so we can test on mac/windows
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\"]"
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\", \"motherduck\"]"

jobs:
get_docs_changes:
Expand Down Expand Up @@ -75,7 +75,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
9 changes: 5 additions & 4 deletions dlt/cli/config_toml_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
from tomlkit.container import Container as TOMLContainer
from collections.abc import Sequence as C_Sequence

from dlt.common.configuration.specs.base_configuration import is_hint_not_resolvable
from dlt.common.pendulum import pendulum
from dlt.common.configuration.specs import (
BaseConfiguration,
is_base_configuration_inner_hint,
extract_inner_hint,
)
from dlt.common.data_types import py_type_to_sc_type
from dlt.common.typing import AnyType, is_final_type, is_optional_type
from dlt.common.typing import AnyType, is_optional_type, is_subclass


class WritableConfigValue(NamedTuple):
Expand All @@ -34,7 +35,7 @@ def generate_typed_example(name: str, hint: AnyType) -> Any:
if sc_type == "bool":
return True
if sc_type == "complex":
if issubclass(inner_hint, C_Sequence):
if is_subclass(inner_hint, C_Sequence):
return ["a", "b", "c"]
else:
table = tomlkit.table(False)
Expand Down Expand Up @@ -62,9 +63,9 @@ def write_value(
# skip if table contains the name already
if name in toml_table and not overwrite_existing:
return
# do not dump final and optional fields if they are not of special interest
# do not dump nor resolvable and optional fields if they are not of special interest
if (
is_final_type(hint) or is_optional_type(hint) or default_value is not None
is_hint_not_resolvable(hint) or is_optional_type(hint) or default_value is not None
) and not is_default_of_interest:
return
# get the inner hint to generate cool examples
Expand Down
13 changes: 8 additions & 5 deletions dlt/cli/deploy_command_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,25 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
if n.PIPELINE in visitor.known_calls:
for call_args in visitor.known_calls[n.PIPELINE]:
pipeline_name, pipelines_dir = None, None
f_r_node = call_args.arguments.get("full_refresh")
# Check both full_refresh/dev_mode until full_refresh option is removed from dlt
f_r_node = call_args.arguments.get("full_refresh") or call_args.arguments.get(
"dev_mode"
)
if f_r_node:
f_r_value = evaluate_node_literal(f_r_node)
if f_r_value is None:
fmt.warning(
"The value of `full_refresh` in call to `dlt.pipeline` cannot be"
"The value of `dev_mode` in call to `dlt.pipeline` cannot be"
f" determined from {unparse(f_r_node).strip()}. We assume that you know"
" what you are doing :)"
)
if f_r_value is True:
if fmt.confirm(
"The value of 'full_refresh' is set to True. Do you want to abort to set it"
" to False?",
"The value of 'dev_mode' or 'full_refresh' is set to True. Do you want to"
" abort to set it to False?",
default=True,
):
raise CliCommandException("deploy", "Please set the full_refresh to False")
raise CliCommandException("deploy", "Please set the dev_mode to False")

p_d_node = call_args.arguments.get("pipelines_dir")
if p_d_node:
Expand Down
8 changes: 2 additions & 6 deletions dlt/common/configuration/accessors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import abc
import contextlib
import tomlkit
from typing import Any, ClassVar, List, Sequence, Tuple, Type, TypeVar

from dlt.common.configuration.container import Container
Expand All @@ -9,10 +7,8 @@
from dlt.common.configuration.specs import BaseConfiguration, is_base_configuration_inner_hint
from dlt.common.configuration.utils import deserialize_value, log_traces, auto_cast
from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext
from dlt.common.typing import AnyType, ConfigValue, TSecretValue
from dlt.common.typing import AnyType, ConfigValue, SecretValue, TSecretValue

DLT_SECRETS_VALUE = "secrets.value"
DLT_CONFIG_VALUE = "config.value"
TConfigAny = TypeVar("TConfigAny", bound=Any)


Expand Down Expand Up @@ -129,7 +125,7 @@ def writable_provider(self) -> ConfigProvider:
p for p in self._get_providers_from_context() if p.is_writable and p.supports_secrets
)

value: ClassVar[Any] = ConfigValue
value: ClassVar[Any] = SecretValue
"A placeholder that tells dlt to replace it with actual secret during the call to a source or resource decorated function."


Expand Down
3 changes: 2 additions & 1 deletion dlt/common/configuration/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ContainerInjectableContextMangled,
ContextDefaultCannotBeCreated,
)
from dlt.common.typing import is_subclass

TConfiguration = TypeVar("TConfiguration", bound=ContainerInjectableContext)

Expand Down Expand Up @@ -56,7 +57,7 @@ def __init__(self) -> None:

def __getitem__(self, spec: Type[TConfiguration]) -> TConfiguration:
# return existing config object or create it from spec
if not issubclass(spec, ContainerInjectableContext):
if not is_subclass(spec, ContainerInjectableContext):
raise KeyError(f"{spec.__name__} is not a context")

context, item = self._thread_getitem(spec)
Expand Down
93 changes: 54 additions & 39 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import inspect

from functools import wraps
from typing import Callable, Dict, Type, Any, Optional, Tuple, TypeVar, overload, cast
from typing import Callable, Dict, Type, Any, Optional, Union, Tuple, TypeVar, overload, cast
from inspect import Signature, Parameter
from contextlib import nullcontext

from dlt.common.typing import DictStrAny, StrAny, TFun, AnyFun
from dlt.common.typing import DictStrAny, TFun, AnyFun
from dlt.common.configuration.resolve import resolve_configuration, inject_section
from dlt.common.configuration.specs.base_configuration import BaseConfiguration
from dlt.common.configuration.specs.config_section_context import ConfigSectionContext
Expand All @@ -15,22 +14,24 @@

_LAST_DLT_CONFIG = "_dlt_config"
_ORIGINAL_ARGS = "_dlt_orig_args"
# keep a registry of all the decorated functions
_FUNC_SPECS: Dict[int, Type[BaseConfiguration]] = {}

TConfiguration = TypeVar("TConfiguration", bound=BaseConfiguration)


def get_fun_spec(f: AnyFun) -> Type[BaseConfiguration]:
return _FUNC_SPECS.get(id(f))
return getattr(f, "__SPEC__", None) # type: ignore[no-any-return]


def set_fun_spec(f: AnyFun, spec: Type[BaseConfiguration]) -> None:
"""Assigns a spec to a callable from which it was inferred"""
setattr(f, "__SPEC__", spec) # noqa: B010


@overload
def with_config(
func: TFun,
/,
spec: Type[BaseConfiguration] = None,
sections: Tuple[str, ...] = (),
sections: Union[str, Tuple[str, ...]] = (),
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True,
Expand All @@ -46,7 +47,7 @@ def with_config(
func: None = ...,
/,
spec: Type[BaseConfiguration] = None,
sections: Tuple[str, ...] = (),
sections: Union[str, Tuple[str, ...]] = (),
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True,
Expand All @@ -61,7 +62,7 @@ def with_config(
func: Optional[AnyFun] = None,
/,
spec: Type[BaseConfiguration] = None,
sections: Tuple[str, ...] = (),
sections: Union[str, Tuple[str, ...]] = (),
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True,
Expand All @@ -88,17 +89,18 @@ def with_config(
Callable[[TFun], TFun]: A decorated function
"""

section_f: Callable[[StrAny], str] = None
# section may be a function from function arguments to section
if callable(sections):
section_f = sections

def decorator(f: TFun) -> TFun:
SPEC: Type[BaseConfiguration] = None
sig: Signature = inspect.signature(f)
signature_fields: Dict[str, Any]
# find variadic kwargs to which additional arguments and injection context can be injected
kwargs_arg = next(
(p for p in sig.parameters.values() if p.kind == Parameter.VAR_KEYWORD), None
(
p
for p in sig.parameters.values()
if p.kind == Parameter.VAR_KEYWORD and p.name == "injection_kwargs"
),
None,
)
if spec is None:
SPEC, signature_fields = spec_from_signature(f, sig, include_defaults, base=base)
Expand All @@ -109,7 +111,7 @@ def decorator(f: TFun) -> TFun:
# if no signature fields were added we will not wrap `f` for injection
if len(signature_fields) == 0:
# always register new function
_FUNC_SPECS[id(f)] = SPEC
set_fun_spec(f, SPEC)
return f

spec_arg: Parameter = None
Expand All @@ -127,20 +129,23 @@ def decorator(f: TFun) -> TFun:
pipeline_name_arg = p
pipeline_name_arg_default = None if p.default == Parameter.empty else p.default

def resolve_config(bound_args: inspect.BoundArguments) -> BaseConfiguration:
def resolve_config(
bound_args: inspect.BoundArguments, accept_partial_: bool
) -> BaseConfiguration:
"""Resolve arguments using the provided spec"""
# bind parameters to signature
# for calls containing resolved spec in the kwargs, we do not need to resolve again
config: BaseConfiguration = None

# if section derivation function was provided then call it
if section_f:
curr_sections: Tuple[str, ...] = (section_f(bound_args.arguments),)
# sections may be a string
elif isinstance(sections, str):
curr_sections = (sections,)
curr_sections: Union[str, Tuple[str, ...]] = None
# section may be a function from function arguments to section
if callable(sections):
curr_sections = sections(bound_args.arguments)
else:
curr_sections = sections
# sections may be a string
if isinstance(curr_sections, str):
curr_sections = (curr_sections,)

# if one of arguments is spec the use it as initial value
if initial_config:
Expand All @@ -162,18 +167,19 @@ def resolve_config(bound_args: inspect.BoundArguments) -> BaseConfiguration:

# this may be called from many threads so section_context is thread affine
with inject_section(section_context, lock_context=lock_context_on_injection):
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections}")
# print(f"RESOLVE CONF in inject: {f.__name__}: {section_context.sections} vs {sections} in {bound_args.arguments}")
return resolve_configuration(
config or SPEC(),
explicit_value=bound_args.arguments,
accept_partial=accept_partial,
accept_partial=accept_partial_,
)

def update_bound_args(
bound_args: inspect.BoundArguments, config: BaseConfiguration, args: Any, kwargs: Any
) -> None:
# overwrite or add resolved params
resolved_params = dict(config)
# print("resolved_params", resolved_params)
# overwrite or add resolved params
for p in sig.parameters.values():
if p.name in resolved_params:
Expand All @@ -191,11 +197,18 @@ def update_bound_args(

def with_partially_resolved_config(config: Optional[BaseConfiguration] = None) -> Any:
# creates a pre-resolved partial of the decorated function
empty_bound_args = sig.bind_partial()
if not config:
config = resolve_config(empty_bound_args)

def wrapped(*args: Any, **kwargs: Any) -> Any:
# TODO: this will not work if correct config is not provided
# esp. in case of parameters in _wrap being ConfigurationBase
# at least we should implement re-resolve with explicit parameters
# so we can merge partial we get here to combine a full config
empty_bound_args = sig.bind_partial()
# TODO: resolve partial here that will be updated in _wrap
config = resolve_config(empty_bound_args, accept_partial_=False)

@wraps(f)
def _wrap(*args: Any, **kwargs: Any) -> Any:
# TODO: we should not change the outer config but deepcopy it
nonlocal config

# Do we need an exception here?
Expand All @@ -213,27 +226,28 @@ def wrapped(*args: Any, **kwargs: Any) -> Any:

# call the function with the pre-resolved config
bound_args = sig.bind(*args, **kwargs)
# TODO: update partial config with bound_args (to cover edge cases with embedded configs)
update_bound_args(bound_args, config, args, kwargs)
return f(*bound_args.args, **bound_args.kwargs)

return wrapped
return _wrap

@wraps(f)
def _wrap(*args: Any, **kwargs: Any) -> Any:
# Resolve config
config: BaseConfiguration = None
bound_args = sig.bind(*args, **kwargs)
bound_args = sig.bind_partial(*args, **kwargs)
if _LAST_DLT_CONFIG in kwargs:
config = last_config(**kwargs)
else:
config = resolve_config(bound_args)
config = resolve_config(bound_args, accept_partial_=accept_partial)

# call the function with resolved config
update_bound_args(bound_args, config, args, kwargs)
return f(*bound_args.args, **bound_args.kwargs)

# register the spec for a wrapped function
_FUNC_SPECS[id(_wrap)] = SPEC
set_fun_spec(_wrap, SPEC)

# add a method to create a pre-resolved partial
setattr(_wrap, "__RESOLVED_PARTIAL_FUNC__", with_partially_resolved_config) # noqa: B010
Expand All @@ -255,13 +269,14 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
return decorator(func)


def last_config(**kwargs: Any) -> Any:
"""Get configuration instance used to inject function arguments"""
return kwargs[_LAST_DLT_CONFIG]
def last_config(**injection_kwargs: Any) -> Any:
"""Get configuration instance used to inject function kwargs"""
return injection_kwargs[_LAST_DLT_CONFIG]


def get_orig_args(**kwargs: Any) -> Tuple[Tuple[Any], DictStrAny]:
return kwargs[_ORIGINAL_ARGS] # type: ignore
def get_orig_args(**injection_kwargs: Any) -> Tuple[Tuple[Any], DictStrAny]:
"""Get original argument with which the injectable function was called"""
return injection_kwargs[_ORIGINAL_ARGS] # type: ignore


def create_resolved_partial(f: AnyFun, config: Optional[BaseConfiguration] = None) -> AnyFun:
Expand Down
Loading

0 comments on commit a00cb0a

Please sign in to comment.