From 61c2ed96053bd02632b87e2c85fa940a91a9d03b Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Sat, 30 Nov 2024 14:45:29 -0500 Subject: [PATCH] Incremental table hints and incremental in resource decorator (#2033) * Incremental table hints and incremental in resource decorator * Extract incremental settings to a dict in table schema * Support passing incremental settings to @resource decorator * Fix type errors * Reset incremental from_hints when set in resource decorator * Column hint * Merge multiple hints * Test non column match * adds make_hints test * Accept TIncrementalconfig in make hints * bool only incremental hint * Test jsonpath simple field name --------- Co-authored-by: Marcin Rudolf --- dlt/common/destination/typing.py | 6 +- dlt/common/incremental/__init__.py | 0 dlt/{extract => common}/incremental/typing.py | 12 +- dlt/common/pipeline.py | 3 +- dlt/common/schema/typing.py | 5 +- dlt/common/schema/utils.py | 11 + dlt/common/typing.py | 7 + .../impl/bigquery/bigquery_adapter.py | 6 +- .../impl/lancedb/lancedb_adapter.py | 3 +- .../impl/qdrant/qdrant_adapter.py | 3 +- .../impl/weaviate/weaviate_adapter.py | 3 +- dlt/extract/decorators.py | 19 +- dlt/extract/extract.py | 7 +- dlt/extract/hints.py | 36 ++- dlt/extract/incremental/__init__.py | 74 +++++- dlt/extract/incremental/transform.py | 5 +- dlt/extract/items.py | 12 +- dlt/extract/resource.py | 113 ++++++--- dlt/extract/utils.py | 11 +- dlt/pipeline/pipeline.py | 3 +- dlt/sources/rest_api/typing.py | 7 +- tests/common/test_jsonpath.py | 43 ++++ tests/common/test_validation.py | 4 +- tests/extract/test_extract.py | 42 ++++ tests/extract/test_incremental.py | 229 +++++++++++++++++- 25 files changed, 577 insertions(+), 87 deletions(-) create mode 100644 dlt/common/incremental/__init__.py rename dlt/{extract => common}/incremental/typing.py (66%) create mode 100644 tests/common/test_jsonpath.py diff --git a/dlt/common/destination/typing.py b/dlt/common/destination/typing.py index 8cc08756cd..c79a2b0adc 100644 --- a/dlt/common/destination/typing.py +++ b/dlt/common/destination/typing.py @@ -1,6 +1,10 @@ from typing import Optional -from dlt.common.schema.typing import _TTableSchemaBase, TWriteDisposition, TTableReferenceParam +from dlt.common.schema.typing import ( + _TTableSchemaBase, + TWriteDisposition, + TTableReferenceParam, +) class PreparedTableSchema(_TTableSchemaBase, total=False): diff --git a/dlt/common/incremental/__init__.py b/dlt/common/incremental/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dlt/extract/incremental/typing.py b/dlt/common/incremental/typing.py similarity index 66% rename from dlt/extract/incremental/typing.py rename to dlt/common/incremental/typing.py index 7b7786b529..460e2f234b 100644 --- a/dlt/extract/incremental/typing.py +++ b/dlt/common/incremental/typing.py @@ -2,9 +2,7 @@ from typing import Any, Callable, List, Literal, Optional, Sequence, TypeVar, Union -from dlt.common.schema.typing import TColumnNames -from dlt.common.typing import TSortOrder -from dlt.extract.items import TTableHintTemplate +from dlt.common.typing import TSortOrder, TTableHintTemplate, TColumnNames TCursorValue = TypeVar("TCursorValue", bound=Any) LastValueFunc = Callable[[Sequence[TCursorValue]], Any] @@ -19,10 +17,12 @@ class IncrementalColumnState(TypedDict): class IncrementalArgs(TypedDict, total=False): cursor_path: str - initial_value: Optional[str] - last_value_func: Optional[LastValueFunc[str]] + initial_value: Optional[Any] + last_value_func: Optional[Union[LastValueFunc[str], Literal["min", "max"]]] + """Last value callable or name of built in function""" primary_key: Optional[TTableHintTemplate[TColumnNames]] - end_value: Optional[str] + end_value: Optional[Any] row_order: Optional[TSortOrder] allow_external_schedulers: Optional[bool] lag: Optional[Union[float, int]] + on_cursor_value_missing: Optional[OnCursorValueMissing] diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index dba1036f85..9d3d5792ea 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -48,7 +48,6 @@ ) from dlt.common.schema import Schema from dlt.common.schema.typing import ( - TColumnNames, TColumnSchema, TWriteDispositionConfig, TSchemaContract, @@ -56,7 +55,7 @@ from dlt.common.storages.load_package import ParsedLoadJobFileName from dlt.common.storages.load_storage import LoadPackageInfo from dlt.common.time import ensure_pendulum_datetime, precise_time -from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize +from dlt.common.typing import DictStrAny, REPattern, StrAny, SupportsHumanize, TColumnNames from dlt.common.jsonpath import delete_matches, TAnyJsonPath from dlt.common.data_writers.writers import TLoaderFileFormat from dlt.common.utils import RowCounts, merge_row_counts diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index ed6c1c6d78..c8f5de03ed 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -19,7 +19,7 @@ from dlt.common.data_types import TDataType from dlt.common.normalizers.typing import TNormalizersConfig -from dlt.common.typing import TSortOrder, TAnyDateTime, TLoaderFileFormat +from dlt.common.typing import TSortOrder, TAnyDateTime, TLoaderFileFormat, TColumnNames try: from pydantic import BaseModel as _PydanticBaseModel @@ -132,8 +132,6 @@ class TColumnPropInfo(NamedTuple): "timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double" ] TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]] -TColumnNames = Union[str, Sequence[str]] -"""A string representing a column name or a list of""" class TColumnType(TypedDict, total=False): @@ -166,6 +164,7 @@ class TColumnSchema(TColumnSchemaBase, total=False): variant: Optional[bool] hard_delete: Optional[bool] dedup_sort: Optional[TSortOrder] + incremental: Optional[bool] TTableSchemaColumns = Dict[str, TColumnSchema] diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index e2e1f959dc..038abdc4d0 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -547,6 +547,17 @@ def merge_diff(table: TTableSchema, table_diff: TPartialTableSchema) -> TPartial * table hints are added or replaced from diff * nothing gets deleted """ + + incremental_a_col = get_first_column_name_with_prop( + table, "incremental", include_incomplete=True + ) + if incremental_a_col: + incremental_b_col = get_first_column_name_with_prop( + table_diff, "incremental", include_incomplete=True + ) + if incremental_b_col: + table["columns"][incremental_a_col].pop("incremental") + # add new columns when all checks passed updated_columns = merge_columns(table["columns"], table_diff["columns"]) table.update(table_diff) diff --git a/dlt/common/typing.py b/dlt/common/typing.py index 94edb57194..a3364d1b07 100644 --- a/dlt/common/typing.py +++ b/dlt/common/typing.py @@ -29,6 +29,7 @@ Iterator, Generator, NamedTuple, + Sequence, ) from typing_extensions import ( @@ -112,6 +113,8 @@ class SecretSentinel: TSecretStrValue = Annotated[str, SecretSentinel] +TColumnNames = Union[str, Sequence[str]] +"""A string representing a column name or a list of""" TDataItem: TypeAlias = Any """A single data item as extracted from data source""" TDataItems: TypeAlias = Union[TDataItem, List[TDataItem]] @@ -126,6 +129,10 @@ class SecretSentinel: TLoaderFileFormat = Literal["jsonl", "typed-jsonl", "insert_values", "parquet", "csv", "reference"] """known loader file formats""" +TDynHintType = TypeVar("TDynHintType") +TFunHintTemplate = Callable[[TDataItem], TDynHintType] +TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]] + class ConfigValueSentinel(NamedTuple): """Class to create singleton sentinel for config and secret injected value""" diff --git a/dlt/destinations/impl/bigquery/bigquery_adapter.py b/dlt/destinations/impl/bigquery/bigquery_adapter.py index 5f6a1fab85..05b26530d9 100644 --- a/dlt/destinations/impl/bigquery/bigquery_adapter.py +++ b/dlt/destinations/impl/bigquery/bigquery_adapter.py @@ -4,10 +4,8 @@ from dlt.common.destination import PreparedTableSchema from dlt.common.pendulum import timezone -from dlt.common.schema.typing import ( - TColumnNames, - TTableSchemaColumns, -) +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.typing import TColumnNames from dlt.destinations.utils import get_resource_for_adapter from dlt.extract import DltResource from dlt.extract.items import TTableHintTemplate diff --git a/dlt/destinations/impl/lancedb/lancedb_adapter.py b/dlt/destinations/impl/lancedb/lancedb_adapter.py index 4314dd703f..d192168d0a 100644 --- a/dlt/destinations/impl/lancedb/lancedb_adapter.py +++ b/dlt/destinations/impl/lancedb/lancedb_adapter.py @@ -1,6 +1,7 @@ from typing import Any, Dict -from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.typing import TColumnNames from dlt.destinations.utils import get_resource_for_adapter from dlt.extract import DltResource from dlt.extract.items import TTableHintTemplate diff --git a/dlt/destinations/impl/qdrant/qdrant_adapter.py b/dlt/destinations/impl/qdrant/qdrant_adapter.py index bbc2d719a8..5a5a44965c 100644 --- a/dlt/destinations/impl/qdrant/qdrant_adapter.py +++ b/dlt/destinations/impl/qdrant/qdrant_adapter.py @@ -1,6 +1,7 @@ from typing import Any -from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.typing import TColumnNames from dlt.extract import DltResource from dlt.destinations.utils import get_resource_for_adapter diff --git a/dlt/destinations/impl/weaviate/weaviate_adapter.py b/dlt/destinations/impl/weaviate/weaviate_adapter.py index 0ca9047528..329d13c493 100644 --- a/dlt/destinations/impl/weaviate/weaviate_adapter.py +++ b/dlt/destinations/impl/weaviate/weaviate_adapter.py @@ -1,6 +1,7 @@ from typing import Dict, Any, Literal, Set, get_args -from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.typing import TColumnNames from dlt.extract import DltResource, resource as make_resource from dlt.destinations.utils import get_resource_for_adapter diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index 63140e8f78..f8703e1452 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -32,7 +32,6 @@ from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION from dlt.common.schema.schema import Schema from dlt.common.schema.typing import ( - TColumnNames, TFileFormat, TWriteDisposition, TWriteDispositionConfig, @@ -43,7 +42,8 @@ ) from dlt.common.storages.exceptions import SchemaNotFoundError from dlt.common.storages.schema_storage import SchemaStorage -from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems +from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems, TColumnNames + from dlt.common.utils import get_callable_name, get_module_name, is_inner_callable from dlt.extract.hints import make_hints @@ -70,6 +70,7 @@ TSourceFunParams, ) from dlt.extract.resource import DltResource, TUnboundDltResource, TDltResourceImpl +from dlt.extract.incremental import TIncrementalConfig @configspec @@ -446,6 +447,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] ) -> TDltResourceImpl: ... @@ -468,6 +470,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] ) -> Callable[[Callable[TResourceFunParams, Any]], TDltResourceImpl]: ... @@ -490,6 +493,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] standalone: Literal[True] = True, ) -> Callable[ @@ -515,6 +519,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] ) -> TDltResourceImpl: ... @@ -536,6 +541,7 @@ def resource( selected: bool = True, spec: Type[BaseConfiguration] = None, parallelized: bool = False, + incremental: Optional[TIncrementalConfig] = None, _impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment] standalone: bool = False, data_from: TUnboundDltResource = None, @@ -632,6 +638,7 @@ def make_resource(_name: str, _section: str, _data: Any) -> TDltResourceImpl: table_format=table_format, file_format=file_format, references=references, + incremental=incremental, ) resource = _impl_cls.from_data( @@ -643,6 +650,10 @@ def make_resource(_name: str, _section: str, _data: Any) -> TDltResourceImpl: cast(DltResource, data_from), True, ) + + if incremental: + # Reset the flag to allow overriding by incremental argument + resource.incremental._from_hints = False # If custom nesting level was specified then # we need to add it to table hints so that # later in normalizer dlt/common/normalizers/json/relational.py @@ -681,7 +692,7 @@ def _wrap(*args: Any, **kwargs: Any) -> TDltResourceImpl: return _wrap def decorator( - f: Callable[TResourceFunParams, Any] + f: Callable[TResourceFunParams, Any], ) -> Callable[TResourceFunParams, TDltResourceImpl]: if not callable(f): if data_from: @@ -1023,7 +1034,7 @@ def get_source() -> DltSource: def defer( - f: Callable[TDeferredFunParams, TBoundItems] + f: Callable[TDeferredFunParams, TBoundItems], ) -> Callable[TDeferredFunParams, TDeferred[TBoundItems]]: @wraps(f) def _wrap(*args: Any, **kwargs: Any) -> TDeferred[TBoundItems]: diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index e65f6cf0d0..25c3a0dbae 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -2,7 +2,7 @@ from collections.abc import Sequence as C_Sequence from copy import copy import itertools -from typing import Iterator, List, Dict, Any, Optional +from typing import Iterator, List, Dict, Any, Optional, Mapping import yaml from dlt.common.configuration.container import Container @@ -17,13 +17,12 @@ WithStepInfo, reset_resource_state, ) -from dlt.common.typing import DictStrAny +from dlt.common.typing import DictStrAny, TColumnNames from dlt.common.runtime import signals from dlt.common.runtime.collector import Collector, NULL_COLLECTOR from dlt.common.schema import Schema, utils from dlt.common.schema.typing import ( TAnySchemaColumns, - TColumnNames, TSchemaContract, TTableFormat, TWriteDispositionConfig, @@ -39,7 +38,7 @@ from dlt.extract.decorators import SourceInjectableContext, SourceSchemaInjectableContext from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints -from dlt.extract.incremental import IncrementalResourceWrapper +from dlt.extract.incremental import IncrementalResourceWrapper, Incremental from dlt.extract.pipe_iterator import PipeIterator from dlt.extract.source import DltSource from dlt.extract.resource import DltResource diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 5daabd0c6a..000e5c4cdb 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -1,10 +1,9 @@ -from typing import TypedDict, cast, Any, Optional, Dict, Sequence, Mapping +from typing import TypedDict, cast, Any, Optional, Dict, Sequence, Mapping, Union from typing_extensions import Self from dlt.common import logger from dlt.common.schema.typing import ( C_DLT_ID, - TColumnNames, TColumnProp, TFileFormat, TPartialTableSchema, @@ -28,7 +27,7 @@ new_column, new_table, ) -from dlt.common.typing import TDataItem +from dlt.common.typing import TDataItem, TColumnNames from dlt.common.time import ensure_pendulum_datetime from dlt.common.utils import clone_dict_nested from dlt.common.normalizers.json.relational import DataItemNormalizer @@ -37,7 +36,7 @@ DataItemRequiredForDynamicTableHints, InconsistentTableTemplate, ) -from dlt.extract.incremental import Incremental +from dlt.extract.incremental import Incremental, TIncrementalConfig from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, TableNameMeta, ValidateItem from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint from dlt.extract.validation import create_item_validator @@ -86,6 +85,7 @@ def make_hints( table_format: TTableHintTemplate[TTableFormat] = None, file_format: TTableHintTemplate[TFileFormat] = None, references: TTableHintTemplate[TTableReferenceParam] = None, + incremental: TIncrementalConfig = None, ) -> TResourceHints: """A convenience function to create resource hints. Accepts both static and dynamic hints based on data. @@ -119,6 +119,8 @@ def make_hints( if validator: new_template["validator"] = validator DltResourceHints.validate_dynamic_hints(new_template) + if incremental is not None: # TODO: Validate + new_template["incremental"] = Incremental.ensure_instance(incremental) return new_template @@ -204,6 +206,10 @@ def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTab for k, v in table_template.items() if k not in NATURAL_CALLABLES } # type: ignore + if "incremental" in table_template: + incremental = table_template["incremental"] + if isinstance(incremental, Incremental) and incremental is not Incremental.EMPTY: + resolved_template["incremental"] = incremental table_schema = self._create_table_schema(resolved_template, self.name) migrate_complex_types(table_schema, warn=True) validate_dict_ignoring_xkeys( @@ -221,7 +227,7 @@ def apply_hints( columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None, - incremental: Incremental[Any] = None, + incremental: TIncrementalConfig = None, schema_contract: TTableHintTemplate[TSchemaContract] = None, additional_table_hints: Optional[Dict[str, TTableHintTemplate[Any]]] = None, table_format: TTableHintTemplate[TTableFormat] = None, @@ -360,7 +366,7 @@ def apply_hints( # set properties that can't be passed to make_hints if incremental is not None: - t["incremental"] = incremental + t["incremental"] = Incremental.ensure_instance(incremental) self._set_hints(t, create_table_variant) return self @@ -506,6 +512,22 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: "row_key": False, } + @staticmethod + def _merge_incremental_column_hint(dict_: Dict[str, Any]) -> None: + incremental = dict_.pop("incremental") + if incremental is None: + return + col_name = incremental.get_cursor_column_name() + if not col_name: + # cursor cannot resolve to a single column, no hint added + return + incremental_col = dict_["columns"].get(col_name) + if not incremental_col: + incremental_col = {"name": col_name} + + incremental_col["incremental"] = True + dict_["columns"][col_name] = incremental_col + @staticmethod def _create_table_schema(resource_hints: TResourceHints, resource_name: str) -> TTableSchema: """Creates table schema from resource hints and resource name. Resource hints are resolved @@ -518,6 +540,8 @@ def _create_table_schema(resource_hints: TResourceHints, resource_name: str) -> "disposition": resource_hints["write_disposition"] } # wrap in dict DltResourceHints._merge_write_disposition_dict(resource_hints) # type: ignore[arg-type] + if "incremental" in resource_hints: + DltResourceHints._merge_incremental_column_hint(resource_hints) # type: ignore[arg-type] dict_ = cast(TTableSchema, resource_hints) dict_["resource"] = resource_name return dict_ diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 69af0d68a6..28d33bb71f 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -1,6 +1,6 @@ import os from datetime import datetime # noqa: I251 -from typing import Generic, ClassVar, Any, Optional, Type, Dict, Union +from typing import Generic, ClassVar, Any, Optional, Type, Dict, Union, Literal, Tuple from typing_extensions import get_args import inspect @@ -9,7 +9,7 @@ from dlt.common import logger from dlt.common.exceptions import MissingDependencyException from dlt.common.pendulum import pendulum -from dlt.common.jsonpath import compile_path +from dlt.common.jsonpath import compile_path, extract_simple_field_name from dlt.common.typing import ( TDataItem, TDataItems, @@ -19,8 +19,8 @@ get_generic_type_argument_from_instance, is_optional_type, is_subclass, + TColumnNames, ) -from dlt.common.schema.typing import TColumnNames from dlt.common.configuration import configspec, ConfigurationValueError from dlt.common.configuration.specs import BaseConfiguration from dlt.common.pipeline import resource_state @@ -29,17 +29,19 @@ coerce_value, py_type_to_sc_type, ) +from dlt.common.utils import without_none from dlt.extract.exceptions import IncrementalUnboundError from dlt.extract.incremental.exceptions import ( IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, ) -from dlt.extract.incremental.typing import ( +from dlt.common.incremental.typing import ( IncrementalColumnState, TCursorValue, LastValueFunc, OnCursorValueMissing, + IncrementalArgs, ) from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform from dlt.extract.incremental.transform import ( @@ -123,7 +125,7 @@ def __init__( self, cursor_path: str = None, initial_value: Optional[TCursorValue] = None, - last_value_func: Optional[LastValueFunc[TCursorValue]] = max, + last_value_func: Optional[Union[LastValueFunc[TCursorValue], Literal["min", "max"]]] = max, primary_key: Optional[TTableHintTemplate[TColumnNames]] = None, end_value: Optional[TCursorValue] = None, row_order: Optional[TSortOrder] = None, @@ -135,6 +137,16 @@ def __init__( if cursor_path: compile_path(cursor_path) self.cursor_path = cursor_path + if isinstance(last_value_func, str): + if last_value_func == "min": + last_value_func = min + elif last_value_func == "max": + last_value_func = max + else: + raise ValueError( + f"Unknown last_value_func '{last_value_func}' passed as string. Provide a" + " callable to use a custom function." + ) self.last_value_func = last_value_func self.initial_value = initial_value """Initial value of last_value""" @@ -247,6 +259,10 @@ def copy(self) -> "Incremental[TCursorValue]": # merge creates a copy return self.merge(self) + def get_cursor_column_name(self) -> Optional[str]: + """Return the name of the cursor column if the cursor path resolves to a single column""" + return extract_simple_field_name(self.cursor_path) + def on_resolved(self) -> None: compile_path(self.cursor_path) if self.end_value is not None and self.initial_value is None: @@ -491,6 +507,12 @@ def can_close(self) -> bool: and self.start_out_of_range ) + @classmethod + def ensure_instance(cls, value: "TIncrementalConfig") -> "Incremental[TCursorValue]": + if isinstance(value, Incremental): + return value + return cls(**value) + def __str__(self) -> str: return ( f"Incremental at 0x{id(self):x} for resource {self.resource_name} with cursor path:" @@ -511,7 +533,6 @@ def _get_transformer(self, items: TDataItems) -> IncrementalTransform: def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]: if rows is None: return rows - transformer = self._get_transformer(rows) if isinstance(rows, list): rows = [ @@ -556,6 +577,8 @@ def _check_duplicate_cursor_threshold( Incremental.EMPTY = Incremental[Any]() Incremental.EMPTY.__is_resolved__ = True +TIncrementalConfig = Union[Incremental[Any], IncrementalArgs] + class IncrementalResourceWrapper(ItemTransform[TDataItem]): placement_affinity: ClassVar[float] = 1 # stick to end @@ -595,6 +618,34 @@ def get_incremental_arg(sig: inspect.Signature) -> Optional[inspect.Parameter]: break return incremental_param + @staticmethod + def inject_implicit_incremental_arg( + incremental: Optional[Union[Incremental[Any], "IncrementalResourceWrapper"]], + sig: inspect.Signature, + func_args: Tuple[Any], + func_kwargs: Dict[str, Any], + fallback: Optional[Incremental[Any]] = None, + ) -> Tuple[Tuple[Any], Dict[str, Any], Optional[Incremental[Any]]]: + """Inject the incremental instance into function arguments + if the function has an incremental argument without default in its signature and it is not already set in the arguments. + + Returns: + Tuple of the new args, kwargs and the incremental instance that was injected (if any) + """ + if isinstance(incremental, IncrementalResourceWrapper): + incremental = incremental.incremental + if not incremental: + if not fallback: + return func_args, func_kwargs, None + incremental = fallback + incremental_param = IncrementalResourceWrapper.get_incremental_arg(sig) + if incremental_param: + bound_args = sig.bind_partial(*func_args, **func_kwargs) + if not bound_args.arguments.get(incremental_param.name): + bound_args.arguments[incremental_param.name] = incremental + return bound_args.args, bound_args.kwargs, incremental + return func_args, func_kwargs, None + def wrap(self, sig: inspect.Signature, func: TFun) -> TFun: """Wrap the callable to inject an `Incremental` object configured for the resource.""" incremental_param = self.get_incremental_arg(sig) @@ -666,12 +717,14 @@ def incremental(self) -> Optional[Incremental[Any]]: return self._incremental def set_incremental( - self, incremental: Optional[Incremental[Any]], from_hints: bool = False + self, incremental: Optional[TIncrementalConfig], from_hints: bool = False ) -> None: """Sets the incremental. If incremental was set from_hints, it can only be changed in the same manner""" if self._from_hints and not from_hints: # do not accept incremental if apply hints were used return + if incremental is not None: + incremental = Incremental.ensure_instance(incremental) self._from_hints = from_hints self._incremental = incremental @@ -710,6 +763,12 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: return self._incremental(item, meta) +def incremental_config_to_instance(cfg: TIncrementalConfig) -> Incremental[Any]: + if isinstance(cfg, Incremental): + return cfg + return Incremental(**cfg) + + __all__ = [ "Incremental", "IncrementalResourceWrapper", @@ -717,6 +776,7 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: "IncrementalCursorPathMissing", "IncrementalPrimaryKeyMissing", "IncrementalUnboundError", + "TIncrementalconfig", "LastValueFunc", "TCursorValue", ] diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 842c8aebe8..22b1194b51 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -5,7 +5,7 @@ from dlt.common.utils import digest128 from dlt.common.json import json from dlt.common.pendulum import pendulum -from dlt.common.typing import TDataItem +from dlt.common.typing import TDataItem, TColumnNames from dlt.common.jsonpath import find_values, compile_path, extract_simple_field_name from dlt.extract.incremental.exceptions import ( IncrementalCursorInvalidCoercion, @@ -13,10 +13,9 @@ IncrementalPrimaryKeyMissing, IncrementalCursorPathHasValueNone, ) -from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing +from dlt.common.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing from dlt.extract.utils import resolve_column_value from dlt.extract.items import TTableHintTemplate -from dlt.common.schema.typing import TColumnNames try: from dlt.common.libs import pyarrow diff --git a/dlt/extract/items.py b/dlt/extract/items.py index d721e8094e..888787e6b7 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -19,7 +19,14 @@ ) from concurrent.futures import Future -from dlt.common.typing import TAny, TDataItem, TDataItems +from dlt.common.typing import ( + TAny, + TDataItem, + TDataItems, + TTableHintTemplate, + TFunHintTemplate, + TDynHintType, +) TDecompositionStrategy = Literal["none", "scc"] @@ -27,9 +34,6 @@ TAwaitableDataItems = Awaitable[TDataItems] TPipedDataItems = Union[TDataItems, TDeferredDataItems, TAwaitableDataItems] -TDynHintType = TypeVar("TDynHintType") -TFunHintTemplate = Callable[[TDataItem], TDynHintType] -TTableHintTemplate = Union[TDynHintType, TFunHintTemplate[TDynHintType]] if TYPE_CHECKING: TItemFuture = Future[TPipedDataItems] diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index c6ca1660f4..42e3905162 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -11,6 +11,7 @@ Union, Any, Optional, + Mapping, ) from typing_extensions import TypeVar, Self @@ -28,6 +29,7 @@ pipeline_state, ) from dlt.common.utils import flatten_list_or_items, get_callable_name, uniq_id +from dlt.common.schema.typing import TTableSchema from dlt.extract.utils import wrap_async_iterator, wrap_parallel_iterator from dlt.extract.items import ( @@ -42,7 +44,7 @@ ) from dlt.extract.pipe_iterator import ManagedPipeIterator from dlt.extract.pipe import Pipe, TPipeStep -from dlt.extract.hints import DltResourceHints, HintsMeta, TResourceHints +from dlt.extract.hints import DltResourceHints, HintsMeta, TResourceHints, make_hints from dlt.extract.incremental import Incremental, IncrementalResourceWrapper from dlt.extract.exceptions import ( InvalidTransformerDataTypeGeneratorFunctionRequired, @@ -442,35 +444,60 @@ def add_step( self._pipe.insert_step(item_transform, insert_at) return self + def _remove_incremental_step(self) -> None: + step_no = self._pipe.find(Incremental, IncrementalResourceWrapper) + if step_no >= 0: + self._pipe.remove_step(step_no) + + def set_incremental( + self, + new_incremental: Union[Incremental[Any], IncrementalResourceWrapper], + from_hints: bool = False, + ) -> Optional[Union[Incremental[Any], IncrementalResourceWrapper]]: + """Set/replace the incremental transform for the resource. + + Args: + new_incremental: The Incremental instance/hint to set or replace + from_hints: If the incremental is set from hints. Defaults to False. + """ + if new_incremental is Incremental.EMPTY: + new_incremental = None + incremental = self.incremental + if incremental is not None: + # if isinstance(new_incremental, Mapping): + # new_incremental = Incremental.ensure_instance(new_incremental) + + if isinstance(new_incremental, IncrementalResourceWrapper): + # Completely replace the wrapper + self._remove_incremental_step() + self.add_step(new_incremental) + elif isinstance(incremental, IncrementalResourceWrapper): + incremental.set_incremental(new_incremental, from_hints=from_hints) + else: + self._remove_incremental_step() + # re-add the step + incremental = None + if incremental is None: + # if there's no wrapper add incremental as a transform + if new_incremental: + if not isinstance(new_incremental, IncrementalResourceWrapper): + new_incremental = Incremental.ensure_instance(new_incremental) + self.add_step(new_incremental) + return new_incremental + def _set_hints( self, table_schema_template: TResourceHints, create_table_variant: bool = False ) -> None: super()._set_hints(table_schema_template, create_table_variant) # validators and incremental apply only to resource hints if not create_table_variant: - incremental = self.incremental # try to late assign incremental if table_schema_template.get("incremental") is not None: - new_incremental = table_schema_template["incremental"] - # remove incremental if empty - if new_incremental is Incremental.EMPTY: - new_incremental = None - - if incremental is not None: - if isinstance(incremental, IncrementalResourceWrapper): - # replace in wrapper - incremental.set_incremental(new_incremental, from_hints=True) - else: - step_no = self._pipe.find(Incremental) - self._pipe.remove_step(step_no) - # re-add the step - incremental = None - - if incremental is None: - # if there's no wrapper add incremental as a transform - incremental = new_incremental # type: ignore - if new_incremental: - self.add_step(new_incremental) + incremental = self.set_incremental( + table_schema_template["incremental"], from_hints=True + ) + else: + incremental = self.incremental if incremental: primary_key = table_schema_template.get("primary_key", incremental.primary_key) @@ -480,10 +507,25 @@ def _set_hints( if table_schema_template.get("validator") is not None: self.validator = table_schema_template["validator"] + def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema: + incremental: Optional[Union[Incremental[Any], IncrementalResourceWrapper]] = ( + self.incremental + ) + if incremental and "incremental" not in self._hints: + if isinstance(incremental, IncrementalResourceWrapper): + incremental = incremental.incremental + if incremental: + self._hints["incremental"] = incremental + + table_schema = super().compute_table_schema(item, meta) + + return table_schema + def bind(self: TDltResourceImpl, *args: Any, **kwargs: Any) -> TDltResourceImpl: """Binds the parametrized resource to passed arguments. Modifies resource pipe in place. Does not evaluate generators or iterators.""" if self._args_bound: raise TypeError(f"Parametrized resource {self.name} is not callable") + orig_gen = self._pipe.gen gen = self._pipe.bind_gen(*args, **kwargs) if isinstance(gen, DltResource): @@ -599,14 +641,14 @@ def _eject_config(self) -> bool: if not self._pipe.is_empty and not self._args_bound: orig_gen = getattr(self._pipe.gen, "__GEN__", None) if orig_gen: - step_no = self._pipe.find(IncrementalResourceWrapper) - if step_no >= 0: - self._pipe.remove_step(step_no) + self._remove_incremental_step() self._pipe.replace_gen(orig_gen) return True return False - def _inject_config(self) -> "DltResource": + def _inject_config( + self, incremental_from_hints_override: Optional[bool] = None + ) -> "DltResource": """Wraps the pipe generation step in incremental and config injection wrappers and adds pipe step with Incremental transform. """ @@ -618,8 +660,17 @@ def _inject_config(self) -> "DltResource": sig = inspect.signature(gen) if IncrementalResourceWrapper.should_wrap(sig): incremental = IncrementalResourceWrapper(self._hints.get("primary_key")) + if incr_hint := self._hints.get("incremental"): + incremental.set_incremental( + incr_hint, + from_hints=( + incremental_from_hints_override + if incremental_from_hints_override is not None + else True + ), + ) incr_f = incremental.wrap(sig, gen) - self.add_step(incremental) + self.set_incremental(incremental) else: incr_f = gen resource_sections = (known_sections.SOURCES, self.section, self.name) @@ -649,6 +700,12 @@ def _clone( if self._pipe and not self._pipe.is_empty: pipe = pipe._clone(new_name=new_name, with_parent=with_parent) # incremental and parent are already in the pipe (if any) + + incremental = self.incremental + if isinstance(incremental, IncrementalResourceWrapper): + incremental_from_hints: Optional[bool] = incremental._from_hints + else: + incremental_from_hints = None r_ = self.__class__( pipe, self._clone_hints(self._hints), @@ -661,7 +718,7 @@ def _clone( # this makes sure that a take config values from a right section and wrapper has a separated # instance in the pipeline if r_._eject_config(): - r_._inject_config() + r_._inject_config(incremental_from_hints_override=incremental_from_hints) return r_ def _get_config_section_context(self) -> ConfigSectionContext: diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 55a8b0b8c4..68570d0995 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -22,8 +22,15 @@ from dlt.common.data_writers import TDataItemFormat from dlt.common.exceptions import MissingDependencyException from dlt.common.pipeline import reset_resource_state -from dlt.common.schema.typing import TColumnNames, TAnySchemaColumns, TTableSchemaColumns -from dlt.common.typing import AnyFun, DictStrAny, TDataItem, TDataItems, TAnyFunOrGenerator +from dlt.common.schema.typing import TAnySchemaColumns, TTableSchemaColumns +from dlt.common.typing import ( + AnyFun, + DictStrAny, + TDataItem, + TDataItems, + TAnyFunOrGenerator, + TColumnNames, +) from dlt.common.utils import get_callable_name from dlt.extract.exceptions import ( diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index a9f07d417e..70d160ea67 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -38,7 +38,6 @@ from dlt.common.exceptions import MissingDependencyException from dlt.common.runtime import signals, apply_runtime_config from dlt.common.schema.typing import ( - TColumnNames, TSchemaTables, TTableFormat, TWriteDispositionConfig, @@ -47,7 +46,7 @@ ) from dlt.common.schema.utils import normalize_schema_name from dlt.common.storages.exceptions import LoadPackageNotFound -from dlt.common.typing import ConfigValue, TFun, TSecretStrValue, is_optional_type +from dlt.common.typing import ConfigValue, TFun, TSecretStrValue, is_optional_type, TColumnNames from dlt.common.runners import pool_runner as runner from dlt.common.storages import ( LiveSchemaStorage, diff --git a/dlt/sources/rest_api/typing.py b/dlt/sources/rest_api/typing.py index ccef828b1a..c48e54de4a 100644 --- a/dlt/sources/rest_api/typing.py +++ b/dlt/sources/rest_api/typing.py @@ -15,7 +15,7 @@ from dlt.common.schema.typing import ( TAnySchemaColumns, ) -from dlt.extract.incremental.typing import IncrementalArgs +from dlt.common.incremental.typing import IncrementalArgs from dlt.extract.items import TTableHintTemplate from dlt.extract.hints import TResourceHintsBase from dlt.sources.helpers.rest_client.auth import AuthConfigBase, TApiKeyLocation @@ -23,9 +23,8 @@ from dataclasses import dataclass, field from dlt.common import jsonpath -from dlt.common.typing import TSortOrder +from dlt.common.typing import TSortOrder, TColumnNames from dlt.common.schema.typing import ( - TColumnNames, TTableFormat, TAnySchemaColumns, TWriteDispositionConfig, @@ -33,7 +32,7 @@ ) from dlt.extract.items import TTableHintTemplate -from dlt.extract.incremental.typing import LastValueFunc +from dlt.common.incremental.typing import LastValueFunc from dlt.extract.resource import DltResource from requests import Session diff --git a/tests/common/test_jsonpath.py b/tests/common/test_jsonpath.py new file mode 100644 index 0000000000..c4e9fbc664 --- /dev/null +++ b/tests/common/test_jsonpath.py @@ -0,0 +1,43 @@ +import pytest + +from dlt.common import jsonpath as jp + + +@pytest.mark.parametrize("compiled", [True, False]) +@pytest.mark.parametrize( + "path, expected", + [ + ("col_a", "col_a"), + ("'col.a'", "col.a"), + ("'$col_a'", "$col_a"), + ("'col|a'", "col|a"), + ], +) +def test_extract_simple_field_name_positive(path, expected, compiled): + if compiled: + path = jp.compile_path(path) + + result = jp.extract_simple_field_name(path) + assert result == expected + + +@pytest.mark.parametrize("compiled", [True, False]) +@pytest.mark.parametrize( + "path", + [ + "$.col_a", + "$.col_a.items", + "$.col_a.items[0]", + "$.col_a.items[*]", + "col_a|col_b", + ], +) +def test_extract_simple_field_name_negative(path, compiled): + if compiled: + path = jp.compile_path(path) + + result = jp.extract_simple_field_name(path) + assert result is None + + +# TODO: Test all jsonpath utils diff --git a/tests/common/test_validation.py b/tests/common/test_validation.py index 3f8ccfc20f..f3ebb02b46 100644 --- a/tests/common/test_validation.py +++ b/tests/common/test_validation.py @@ -19,13 +19,13 @@ from dlt.common import Decimal, jsonpath from dlt.common.exceptions import DictValidationException from dlt.common.schema.typing import ( - TColumnNames, TStoredSchema, TColumnSchema, TWriteDispositionConfig, ) from dlt.common.schema.utils import simple_regex_validator -from dlt.common.typing import DictStrStr, StrStr, TDataItem, TSortOrder +from dlt.common.typing import DictStrStr, StrStr, TDataItem, TSortOrder, TColumnNames + from dlt.common.validation import validate_dict, validate_dict_ignoring_xkeys diff --git a/tests/extract/test_extract.py b/tests/extract/test_extract.py index dbec417f97..9343449aed 100644 --- a/tests/extract/test_extract.py +++ b/tests/extract/test_extract.py @@ -213,6 +213,48 @@ def with_table_hints(): extract_step.extract(source, 20, 1) +def test_extract_hints_mark_incremental(extract_step: Extract) -> None: + os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE" + + @dlt.resource(columns=[{"name": "id", "data_type": "bigint"}], primary_key="id") + def with_table_hints(): + # yield a regular dataset first, simulate backfil + yield [{"id": id_, "pk": "A"} for id_ in range(1, 10)] + + # get the resource + resource = dlt.current.source().resources[dlt.current.resource_name()] + table = resource.compute_table_schema() + # also there we see the hints + assert table["columns"]["id"]["primary_key"] is True + assert table["columns"]["id"]["data_type"] == "bigint" + + # start emitting incremental + yield dlt.mark.with_hints( + [{"id": id_, "pk": "A", "created_at": id_ + 10} for id_ in range(100, 110)], + make_hints(incremental=dlt.sources.incremental("created_at", initial_value=105)), + ) + + # get the resource + resource = dlt.current.source().resources[dlt.current.resource_name()] + assert resource.incremental.cursor_path == "created_at" # type: ignore[attr-defined] + assert resource.incremental.primary_key == "id" + # we are able to add the incremental to the pipe. but it won't + # join actually executing pipe which is a clone of a (partial) pipe of the resource + assert isinstance(resource._pipe._steps[1], dlt.sources.incremental) + # NOTE: this results in unbounded exception + # assert resource.incremental.last_value == 299 + table = resource.compute_table_schema() + assert table["columns"]["created_at"]["incremental"] is not None + + yield [{"id": id_, "pk": "A", "created_at": id_ + 10} for id_ in range(110, 120)] + + source = DltSource(dlt.Schema("hintable"), "module", [with_table_hints]) + extract_step.extract(source, 20, 1) + # make sure incremental is in the source schema + table = source.schema.get_table("with_table_hints") + assert table["columns"]["created_at"]["incremental"] is not None + + def test_extract_metrics_on_exception_no_flush(extract_step: Extract) -> None: @dlt.resource def letters(): diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 7ce4228b6c..30df12ae17 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -5,7 +5,7 @@ from datetime import datetime, date # noqa: I251 from itertools import chain, count from time import sleep -from typing import Any, Optional +from typing import Any, Optional, Literal, Sequence, Dict from unittest import mock import duckdb @@ -1468,10 +1468,13 @@ def test_apply_hints_incremental(item_type: TestDataItemFormat) -> None: data = [{"created_at": 1}, {"created_at": 2}, {"created_at": 3}] source_items = data_to_item_format(item_type, data) + should_have_arg = True + @dlt.resource def some_data(created_at: Optional[dlt.sources.incremental[int]] = None): # make sure that incremental from apply_hints is here - if created_at is not None: + if should_have_arg: + assert created_at is not None assert created_at.cursor_path == "created_at" assert created_at.last_value_func is max yield source_items @@ -1505,6 +1508,7 @@ def some_data(created_at: Optional[dlt.sources.incremental[int]] = None): assert list(r) == [] # remove incremental + should_have_arg = False r.apply_hints(incremental=dlt.sources.incremental.EMPTY) assert r.incremental is not None assert r.incremental.incremental is None @@ -1515,6 +1519,7 @@ def some_data(created_at: Optional[dlt.sources.incremental[int]] = None): # as above but we provide explicit incremental when creating resource p = p.drop() + should_have_arg = True r = some_data(created_at=dlt.sources.incremental("created_at", last_value_func=min)) # hints have precedence, as expected r.apply_hints(incremental=dlt.sources.incremental("created_at", last_value_func=max)) @@ -3568,3 +3573,223 @@ def some_data( call for call in logger_spy.call_args_list if "Large number of records" in call.args[0] ] assert len(warning_calls) == 1 + + +def _resource_for_table_hint( + hint_type: Literal[ + "default_arg", "explicit_arg", "apply_hints", "default_arg_override", "decorator" + ], + data: Sequence[Dict[str, Any]], + incremental_arg: dlt.sources.incremental[Any], + incremental_arg_default: dlt.sources.incremental[Any] = None, +) -> DltResource: + if incremental_arg is None and incremental_arg_default is None: + raise ValueError("One of the incremental arguments must be provided.") + + decorator_arg = None + if hint_type == "default_arg": + default_arg = incremental_arg_default + override_arg = None + elif hint_type == "default_arg_override": + default_arg = incremental_arg_default + override_arg = incremental_arg + elif hint_type == "decorator": + default_arg = None + override_arg = None + decorator_arg = incremental_arg_default + else: + default_arg = None + override_arg = incremental_arg + + @dlt.resource(incremental=decorator_arg) + def some_data( + updated_at: dlt.sources.incremental[Any] = default_arg, + ) -> Any: + yield data_to_item_format("object", data) + + if override_arg is None: + return some_data() + + if hint_type == "apply_hints": + rs = some_data() + rs.apply_hints(incremental=override_arg) + return rs + + return some_data(updated_at=override_arg) + + +@pytest.mark.parametrize( + "hint_type", ["default_arg", "explicit_arg", "apply_hints", "default_arg_override", "decorator"] +) +@pytest.mark.parametrize( + "incremental_settings", + [ + { + "last_value_func": "min", + "row_order": "desc", + "on_cursor_value_missing": "include", + }, + {"last_value_func": "max", "on_cursor_value_missing": "raise"}, + ], +) +def test_incremental_table_hint_datetime_column( + hint_type: Literal[ + "default_arg", + "explicit_arg", + "default_arg_override", + "apply_hints", + "decorator", + ], + incremental_settings: Dict[str, Any], +) -> None: + initial_value_override = pendulum.now() + initial_value_default = pendulum.now().subtract(seconds=10) + rs = _resource_for_table_hint( + hint_type, + [{"updated_at": pendulum.now().add(seconds=i)} for i in range(1, 12)], + dlt.sources.incremental( + "updated_at", initial_value=initial_value_override, **incremental_settings + ), + dlt.sources.incremental( + "updated_at", initial_value=initial_value_default, **incremental_settings + ), + ) + + pipeline = dlt.pipeline(pipeline_name=uniq_id()) + pipeline.extract(rs) + + table_schema = pipeline.default_schema.tables["some_data"] + + assert table_schema["columns"]["updated_at"]["incremental"] is True + + +def incremental_instance_or_dict(use_dict: bool, **kwargs): + if use_dict: + return kwargs + return dlt.sources.incremental(**kwargs) + + +@pytest.mark.parametrize("use_dict", [False, True]) +def test_incremental_in_resource_decorator(use_dict: bool) -> None: + # Incremental set in decorator, without any arguments + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=5, last_value_func=min + ) + ) + def no_incremental_arg(): + yield [{"value": i} for i in range(10)] + + result = list(no_incremental_arg()) + # filtering is applied + assert result == [{"value": i} for i in range(0, 6)] + + # Apply hints overrides the decorator settings + rs = no_incremental_arg() + rs.apply_hints( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=3, last_value_func=max + ) + ) + result = list(rs) + assert result == [{"value": i} for i in range(3, 10)] + + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=5, last_value_func=min + ) + ) + def with_optional_incremental_arg(incremental: Optional[dlt.sources.incremental[int]] = None): + assert incremental is not None + yield [{"value": i} for i in range(10)] + + # Decorator settings are used + result = list(with_optional_incremental_arg()) + assert result == [{"value": i} for i in range(0, 6)] + + +@pytest.mark.parametrize("use_dict", [False, True]) +def test_incremental_in_resource_decorator_default_arg(use_dict: bool) -> None: + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="value", initial_value=5, last_value_func=min + ) + ) + def with_default_incremental_arg( + incremental: dlt.sources.incremental[int] = dlt.sources.incremental( + "value", initial_value=3, last_value_func=min + ) + ): + assert incremental.last_value == initial_value + assert incremental.last_value_func == last_value_func + yield [{"value": i} for i in range(10)] + + last_value_func = max + initial_value = 4 + # Explicit argument overrides the default and decorator argument + result = list( + with_default_incremental_arg( + incremental=dlt.sources.incremental( + "value", initial_value=initial_value, last_value_func=last_value_func + ) + ) + ) + assert result == [{"value": i} for i in range(4, 10)] + + # Decorator param overrides function default arg + last_value_func = min + initial_value = 5 + result = list(with_default_incremental_arg()) + assert result == [{"value": i} for i in range(0, 6)] + + +@pytest.mark.parametrize("use_dict", [False, True]) +def test_incremental_table_hint_merged_columns(use_dict: bool) -> None: + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="col_a", initial_value=3, last_value_func=min + ) + ) + def some_data(): + yield [{"col_a": i, "foo": i + 2, "col_b": i + 1, "bar": i + 3} for i in range(10)] + + pipeline = dlt.pipeline(pipeline_name=uniq_id()) + pipeline.extract(some_data()) + + table_schema = pipeline.default_schema.tables["some_data"] + assert table_schema["columns"]["col_a"]["incremental"] is True + + rs = some_data() + rs.apply_hints( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="col_b", initial_value=5, last_value_func=max + ) + ) + + pipeline.extract(rs) + + table_schema_2 = pipeline.default_schema.tables["some_data"] + + # Only one column should have the hint + assert "incremental" not in table_schema_2["columns"]["col_a"] + assert table_schema_2["columns"]["col_b"]["incremental"] is True + + +@pytest.mark.parametrize("use_dict", [True, False]) +def test_incremental_column_hint_cursor_is_not_column(use_dict: bool): + @dlt.resource( + incremental=incremental_instance_or_dict( + use_dict, cursor_path="col_a|col_b", initial_value=3, last_value_func=min + ) + ) + def some_data(): + yield [{"col_a": i, "foo": i + 2, "col_b": i + 1, "bar": i + 3} for i in range(10)] + + pipeline = dlt.pipeline(pipeline_name=uniq_id()) + + pipeline.extract(some_data()) + + table_schema = pipeline.default_schema.tables["some_data"] + + for col in table_schema["columns"].values(): + assert "incremental" not in col