From bbfbb04fb916a4c44d18c2cb63f03ddb700a4312 Mon Sep 17 00:00:00 2001 From: Willi Date: Tue, 9 Jul 2024 23:00:39 +0530 Subject: [PATCH 01/32] adds failing test --- tests/extract/test_incremental.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index f4082a7d86..296522075f 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -634,6 +634,31 @@ def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")): assert pip_ex.value.__context__.json_path == "item.timestamp" +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_does_not_discard_row_without_incremental_cursor(item_type: TestDataItemFormat) -> None: + last_values = [] + + data = [ + {"id": 1, "created_at": 1}, + {"id": 2, "created_at": None}, + {"id": 3, "created_at": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("created_at")): + last_values.append(created_at.last_value) + yield from source_items + + result = list(some_data()) + assert len(result[0]) == 3 + assert last_values == [None] + + result_2 = list(some_data()) + assert len(result_2[0]) == 3 + assert last_values == [None, 2] + + def test_json_path_cursor() -> None: @dlt.resource def some_data(last_timestamp=dlt.sources.incremental("item.timestamp|modifiedAt")): From b2efc6eba7ccb2b0d048a3984e0ee794587815f2 Mon Sep 17 00:00:00 2001 From: Willi Date: Wed, 10 Jul 2024 17:10:26 +0530 Subject: [PATCH 02/32] incremental extract does not crash if one record has cursor_path = None adds more tests --- dlt/extract/incremental/transform.py | 33 ++++++---- tests/extract/test_incremental.py | 91 +++++++++++++++++++++++++--- 2 files changed, 104 insertions(+), 20 deletions(-) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 947e21f7b8..d8f6b79b49 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -1,4 +1,4 @@ -from datetime import datetime, date # noqa: I251 +from datetime import datetime # noqa: I251 from typing import Any, Optional, Set, Tuple, List from dlt.common.exceptions import MissingDependencyException @@ -117,18 +117,17 @@ def find_cursor_value(self, row: TDataItem) -> Any: Will use compiled JSONPath if present, otherwise it reverts to column search if row is dict """ - row_value: Any = None + if self.cursor_path not in row.keys() and not self._compiled_cursor_path: + raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) + + row_value = row.get(self.cursor_path, None) + if self._compiled_cursor_path: row_values = find_values(self._compiled_cursor_path, row) - if row_values: - row_value = row_values[0] - else: - try: - row_value = row[self.cursor_path] - except Exception: - pass - if row_value is None: - raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) + if not row_values: + raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) + row_value = row_values[0] + return row_value def __call__( @@ -164,14 +163,22 @@ def __call__( ): return None, False, True - check_values = (row_value,) + ((last_value,) if last_value is not None else ()) + if (row_value is None and last_value is None) or ( + row_value is None and self.start_value is None + ): + # store rows with "max" values to compute hashes after processing full batch + self.last_rows = [row] + self.unique_hashes = set() + return row, False, False + row_value_ignored_none = (row_value,) if row_value is not None else () + check_values = (row_value_ignored_none) + ((last_value,) if last_value is not None else ()) new_value = last_value_func(check_values) # new_value is "less" or equal to last_value (the actual max) if last_value == new_value: # use func to compute row_value into last_value compatible processed_row_value = last_value_func((row_value,)) # skip the record that is not a start_value or new_value: that record was already processed - check_values = (row_value,) + ( + check_values = row_value_ignored_none + ( (self.start_value,) if self.start_value is not None else () ) new_value = last_value_func(check_values) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 296522075f..b181c70525 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -166,8 +166,9 @@ def some_data(created_at=dlt.sources.incremental("created_at")): p = dlt.pipeline(pipeline_name=uniq_id()) p.extract(some_data()) - p.extract(some_data()) + assert values == [None] + p.extract(some_data()) assert values == [None, 5] @@ -635,7 +636,7 @@ def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")): @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_does_not_discard_row_without_incremental_cursor(item_type: TestDataItemFormat) -> None: +def test_cursor_path_none_updates_incremental_cursor(item_type: TestDataItemFormat) -> None: last_values = [] data = [ @@ -648,17 +649,93 @@ def test_does_not_discard_row_without_incremental_cursor(item_type: TestDataItem @dlt.resource def some_data(created_at=dlt.sources.incremental("created_at")): last_values.append(created_at.last_value) - yield from source_items + yield source_items - result = list(some_data()) - assert len(result[0]) == 3 + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) assert last_values == [None] - result_2 = list(some_data()) - assert len(result_2[0]) == 3 + p.extract(some_data()) + assert last_values == [None, 2] + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_updates_incremental_cursor_2(item_type: TestDataItemFormat) -> None: + last_values = [] + + data = [ + {"id": 1, "created_at": None}, + {"id": 2, "created_at": 2}, + {"id": 3, "created_at": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("created_at")): + last_values.append(created_at.last_value) + yield source_items + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) + assert last_values == [None] + + p.extract(some_data()) assert last_values == [None, 2] +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_does_not_discard_row(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2, "created_at": None}, + {"id": 3, "created_at": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("created_at")): + yield source_items + + result = list(some_data()) + if item_type == "object": + assert len(result) == 3 + else: + assert len(result[0]) == 3 + + result_2 = list(some_data()) + if item_type == "object": + assert len(result_2) == 3 + else: + assert len(result_2[0]) == 3 + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_does_not_discard_row_2(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1, "created_at": None}, + {"id": 2, "created_at": 1}, + {"id": 3, "created_at": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("created_at")): + yield source_items + + result = list(some_data()) + if item_type == "object": + assert len(result) == 3 + else: + assert len(result[0]) == 3 + + result_2 = list(some_data()) + if item_type == "object": + assert len(result_2) == 3 + else: + assert len(result_2[0]) == 3 + + + def test_json_path_cursor() -> None: @dlt.resource def some_data(last_timestamp=dlt.sources.incremental("item.timestamp|modifiedAt")): From 99327f97695f22fc909b33560ebeba3837ecdf64 Mon Sep 17 00:00:00 2001 From: Willi Date: Thu, 11 Jul 2024 19:38:52 +0530 Subject: [PATCH 03/32] implements ability to raise on cursor_path = None or to include the row. Fails for pandas & pyarrow implements handling of cursor_path value is None for nested JSON paths --- dlt/extract/incremental/__init__.py | 14 +- dlt/extract/incremental/exceptions.py | 13 +- dlt/extract/incremental/transform.py | 26 ++- dlt/extract/incremental/typing.py | 3 +- .../docs/general-usage/incremental-loading.md | 4 +- tests/extract/test_incremental.py | 180 +++++++++++++----- 6 files changed, 185 insertions(+), 55 deletions(-) diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index c1117370b5..b4ff44f8e2 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -35,7 +35,12 @@ IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, ) -from dlt.extract.incremental.typing import IncrementalColumnState, TCursorValue, LastValueFunc +from dlt.extract.incremental.typing import ( + IncrementalColumnState, + TCursorValue, + LastValueFunc, + OnCursorValueNone, +) from dlt.extract.pipe import Pipe from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform from dlt.extract.incremental.transform import ( @@ -81,7 +86,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa >>> info = p.run(r, destination="duckdb") Args: - cursor_path: The name or a JSON path to an cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database. + cursor_path: The name or a JSON path to a cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database. initial_value: Optional value used for `last_value` when no state is available, e.g. on the first run of the pipeline. If not provided `last_value` will be `None` on the first run. last_value_func: Callable used to determine which cursor value to save in state. It is called with a list of the stored state value and all cursor vals from currently processing items. Default is `max` primary_key: Optional primary key used to deduplicate data. If not provided, a primary key defined by the resource will be used. Pass a tuple to define a compound key. Pass empty tuple to disable unique checks @@ -95,6 +100,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class. The values passed explicitly to Incremental will be ignored. Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded + on_cursor_value_none: Specify what happens when a record has `None` at the cursor_path: raise, include """ # this is config/dataclass so declare members @@ -104,6 +110,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa end_value: Optional[Any] = None row_order: Optional[TSortOrder] = None allow_external_schedulers: bool = False + on_cursor_value_none: OnCursorValueNone = "raise" # incremental acting as empty EMPTY: ClassVar["Incremental[Any]"] = None @@ -118,6 +125,7 @@ def __init__( end_value: Optional[TCursorValue] = None, row_order: Optional[TSortOrder] = None, allow_external_schedulers: bool = False, + on_cursor_value_none: OnCursorValueNone = "raise", ) -> None: # make sure that path is valid if cursor_path: @@ -133,6 +141,7 @@ def __init__( self._primary_key: Optional[TTableHintTemplate[TColumnNames]] = primary_key self.row_order = row_order self.allow_external_schedulers = allow_external_schedulers + self.on_cursor_value_none = on_cursor_value_none self._cached_state: IncrementalColumnState = None """State dictionary cached on first access""" @@ -171,6 +180,7 @@ def _make_transforms(self) -> None: self.last_value_func, self._primary_key, set(self._cached_state["unique_hashes"]), + self.on_cursor_value_none, ) @classmethod diff --git a/dlt/extract/incremental/exceptions.py b/dlt/extract/incremental/exceptions.py index e318a028dc..1eebccb37c 100644 --- a/dlt/extract/incremental/exceptions.py +++ b/dlt/extract/incremental/exceptions.py @@ -8,7 +8,18 @@ def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = N self.item = item msg = ( msg - or f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document - if those are different from the names you see in database." + or f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document because they can be different from the names you see in database." + ) + super().__init__(pipe_name, msg) + + +class IncrementalCursorPathHasValueNone(PipeException): + def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None: + self.json_path = json_path + self.item = item + msg = ( + msg + or f"Cursor element with JSON path {json_path} has the value `None` in extracted data item. All data items must contain a value != None. Construct the incremental with on_cursor_value_none='include' if you want to include such rows" ) super().__init__(pipe_name, msg) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index d8f6b79b49..014b4fdf91 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -10,8 +10,9 @@ from dlt.extract.incremental.exceptions import ( IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, + IncrementalCursorPathHasValueNone, ) -from dlt.extract.incremental.typing import TCursorValue, LastValueFunc +from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueNone from dlt.extract.utils import resolve_column_value from dlt.extract.items import TTableHintTemplate from dlt.common.schema.typing import TColumnNames @@ -54,6 +55,7 @@ def __init__( last_value_func: LastValueFunc[TCursorValue], primary_key: Optional[TTableHintTemplate[TColumnNames]], unique_hashes: Set[str], + on_cursor_value_none: OnCursorValueNone = "raise", ) -> None: self.resource_name = resource_name self.cursor_path = cursor_path @@ -66,6 +68,7 @@ def __init__( self.primary_key = primary_key self.unique_hashes = unique_hashes self.start_unique_hashes = set(unique_hashes) + self.on_cursor_value_none = on_cursor_value_none # compile jsonpath self._compiled_cursor_path = compile_path(cursor_path) @@ -120,14 +123,25 @@ def find_cursor_value(self, row: TDataItem) -> Any: if self.cursor_path not in row.keys() and not self._compiled_cursor_path: raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) + row_value = self._value_at_cursor_path(row) + + if self.on_cursor_value_none == "raise" and row_value is None: + raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) + + return row_value + + def _value_at_cursor_path(self, row: TDataItem) -> Any: row_value = row.get(self.cursor_path, None) if self._compiled_cursor_path: - row_values = find_values(self._compiled_cursor_path, row) - if not row_values: + cursor_values = find_values(self._compiled_cursor_path, row) + if cursor_values == [] and self.on_cursor_value_none == "raise": raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) - row_value = row_values[0] - + elif None in cursor_values and self.on_cursor_value_none == "raise": + raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) + else: + # ignores the other found values, e.g. when the path is $data.items[*].created_at + row_value = cursor_values[0] return row_value def __call__( @@ -136,7 +150,7 @@ def __call__( ) -> Tuple[Optional[TDataItem], bool, bool]: """ Returns: - Tuple (row, start_out_of_range, end_out_of_range) where row is either the data item or `None` if it is completely filtered out + Tuple (row, is_below_initial_value, is_above_end_value) where row is either the data item or `None` if it is completely filtered out """ if row is None: return row, False, False diff --git a/dlt/extract/incremental/typing.py b/dlt/extract/incremental/typing.py index 9cec97d34d..14a1435a8a 100644 --- a/dlt/extract/incremental/typing.py +++ b/dlt/extract/incremental/typing.py @@ -1,8 +1,9 @@ -from typing import TypedDict, Optional, Any, List, TypeVar, Callable, Sequence +from typing import TypedDict, Optional, Any, List, Literal, TypeVar, Callable, Sequence TCursorValue = TypeVar("TCursorValue", bound=Any) LastValueFunc = Callable[[Sequence[TCursorValue]], Any] +OnCursorValueNone = Literal["raise", "include"] class IncrementalColumnState(TypedDict): diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index b130f7a4f5..17b7f6e3b2 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -660,7 +660,7 @@ than `end_value`. :::caution In rare cases when you use Incremental with a transformer, `dlt` will not be able to automatically close -generator associated with a row that is out of range. You can still use still call `can_close()` method on +generator associated with a row that is out of range. You can still call the `can_close()` method on incremental and exit yield loop when true. ::: @@ -1133,4 +1133,4 @@ sources: } ``` -Verify that the `last_value` is updated between pipeline runs. \ No newline at end of file +Verify that the `last_value` is updated between pipeline runs. diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index b181c70525..bdd867c65f 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -32,6 +32,7 @@ from dlt.extract.incremental.exceptions import ( IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, + IncrementalCursorPathHasValueNone, ) from dlt.pipeline.exceptions import PipelineStepFailed @@ -636,9 +637,28 @@ def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")): @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_updates_incremental_cursor(item_type: TestDataItemFormat) -> None: - last_values = [] +def test_cursor_path_none_updates_incremental_cursor_1(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1, "created_at": None}, + {"id": 2, "created_at": 1}, + {"id": 3, "created_at": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): + yield source_items + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 2 + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_updates_incremental_cursor_2(item_type: TestDataItemFormat) -> None: data = [ {"id": 1, "created_at": 1}, {"id": 2, "created_at": None}, @@ -647,44 +667,119 @@ def test_cursor_path_none_updates_incremental_cursor(item_type: TestDataItemForm source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at")): - last_values.append(created_at.last_value) + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) p.extract(some_data()) - assert last_values == [None] - - p.extract(some_data()) - assert last_values == [None, 2] + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 2 @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_updates_incremental_cursor_2(item_type: TestDataItemFormat) -> None: - last_values = [] - +def test_cursor_path_none_updates_incremental_cursor_3(item_type: TestDataItemFormat) -> None: data = [ - {"id": 1, "created_at": None}, + {"id": 1, "created_at": 1}, {"id": 2, "created_at": 2}, - {"id": 3, "created_at": 2}, + {"id": 3, "created_at": None}, ] source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at")): - last_values.append(created_at.last_value) + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) p.extract(some_data()) - assert last_values == [None] + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 2 - p.extract(some_data()) - assert last_values == [None, 2] + +def test_cursor_path_none_can_raise_on_none() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + source_items = [ + {"id": 1, "created_at": 1}, + {"id": 2, "created_at": None}, + {"id": 3, "created_at": 2}, + ] + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="raise")): + yield source_items + + with pytest.raises(IncrementalCursorPathHasValueNone) as py_ex: + list(some_data()) + assert py_ex.value.json_path == "created_at" + + # same thing when run in pipeline + with pytest.raises(PipelineStepFailed) as pip_ex: + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) + + assert isinstance(pip_ex.value.__context__, IncrementalCursorPathHasValueNone) + assert pip_ex.value.__context__.json_path == "created_at" + + +def test_cursor_path_none_nested_can_raise_on_none_1() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("data.items[0].created_at", on_cursor_value_none="raise") + ): + yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} + + with pytest.raises(IncrementalCursorPathHasValueNone) as e: + list(some_data()) + assert e.value.json_path == "data.items[0].created_at" + + +def test_cursor_path_none_nested_can_raise_on_none_2() -> None: + # No pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("data.items[*].created_at", on_cursor_value_none="raise") + ): + yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} + + with pytest.raises(IncrementalCursorPathHasValueNone) as e: + list(some_data()) + assert e.value.json_path == "data.items[*].created_at" + + +def test_cursor_path_none_nested_can_include_on_none_1() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental( + "data.items[*].created_at", on_cursor_value_none="include" + ) + ): + yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} + + results = list(some_data()) + assert results[0]["data"]["items"] == [{"created_at": None}, {"created_at": 1}] + + +def test_cursor_path_none_nested_can_include_on_none_2() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental( + "data.items[0].created_at", on_cursor_value_none="include" + ) + ): + yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} + + results = list(some_data()) + assert results[0]["data"]["items"] == [{"created_at": None}, {"created_at": 1}] @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_does_not_discard_row(item_type: TestDataItemFormat) -> None: +def test_cursor_path_none_can_include_on_none_1(item_type: TestDataItemFormat) -> None: data = [ {"id": 1, "created_at": 1}, {"id": 2, "created_at": None}, @@ -693,24 +788,15 @@ def test_cursor_path_none_does_not_discard_row(item_type: TestDataItemFormat) -> source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at")): + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): yield source_items - result = list(some_data()) - if item_type == "object": - assert len(result) == 3 - else: - assert len(result[0]) == 3 - - result_2 = list(some_data()) - if item_type == "object": - assert len(result_2) == 3 - else: - assert len(result_2[0]) == 3 + result = data_item_to_list(item_type, list(some_data())) + assert data_item_length(result) == 3 @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_does_not_discard_row_2(item_type: TestDataItemFormat) -> None: +def test_cursor_path_none_can_include_on_none_2(item_type: TestDataItemFormat) -> None: data = [ {"id": 1, "created_at": None}, {"id": 2, "created_at": 1}, @@ -719,21 +805,29 @@ def test_cursor_path_none_does_not_discard_row_2(item_type: TestDataItemFormat) source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at")): + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): yield source_items - result = list(some_data()) - if item_type == "object": - assert len(result) == 3 - else: - assert len(result[0]) == 3 + result = data_item_to_list(item_type, list(some_data())) + assert data_item_length(result) == 3 - result_2 = list(some_data()) - if item_type == "object": - assert len(result_2) == 3 - else: - assert len(result_2[0]) == 3 +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_can_include_on_none_3(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2, "created_at": 1}, + {"id": 3, "created_at": None}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): + yield source_items + + result = list(some_data()) + values = data_item_to_list(item_type, result) + assert data_item_length(values) == 3 def test_json_path_cursor() -> None: From bfda7de654ca9b6a45a73ef391cbd57619f1f331 Mon Sep 17 00:00:00 2001 From: Willi Date: Fri, 12 Jul 2024 16:35:58 +0530 Subject: [PATCH 04/32] renames start_out_of_range and end_out_of_range to is_below_initial_value and is_above_end_value --- dlt/extract/incremental/__init__.py | 10 +++++----- dlt/extract/incremental/transform.py | 14 +++++++------- .../incremental_loading/incremental_loading.py | 2 +- docs/examples/qdrant_zendesk/qdrant_zendesk.py | 2 +- .../docs/general-usage/incremental-loading.md | 4 ++-- docs/website/docs/getting-started-snippets.py | 5 ++--- .../tutorial/load-data-from-an-api-snippets.py | 5 ++--- .../dispatch-to-multiple-tables.md | 2 +- tests/extract/test_incremental.py | 18 +++++++++--------- 9 files changed, 30 insertions(+), 32 deletions(-) diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index b4ff44f8e2..7432778719 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -147,9 +147,9 @@ def __init__( """State dictionary cached on first access""" super().__init__(lambda x: x) # TODO: - self.end_out_of_range: bool = False + self.is_above_end_value: bool = False """Becomes true on the first item that is out of range of `end_value`. I.e. when using `max` function this means a value that is equal or higher""" - self.start_out_of_range: bool = False + self.is_below_initial_value: bool = False """Becomes true on the first item that is out of range of `start_value`. I.e. when using `max` this is a value that is lower than `start_value`""" self._transformers: Dict[str, IncrementalTransform] = {} @@ -336,7 +336,7 @@ def last_value(self) -> Optional[TCursorValue]: def _transform_item( self, transformer: IncrementalTransform, row: TDataItem ) -> Optional[TDataItem]: - row, self.start_out_of_range, self.end_out_of_range = transformer(row) + row, self.is_below_initial_value, self.is_above_end_value = transformer(row) # if we know that rows are ordered we can close the generator automatically # mind that closing pipe will not immediately close processing. it only closes the # generator so this page will be fully processed @@ -451,9 +451,9 @@ def can_close(self) -> bool: # ordered ascending, check if we cross upper bound return ( self.row_order == "asc" - and self.end_out_of_range + and self.is_above_end_value or self.row_order == "desc" - and self.start_out_of_range + and self.is_below_initial_value ) def __str__(self) -> str: diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 014b4fdf91..a1c2042066 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -276,9 +276,9 @@ def __call__( elif primary_key is None: unique_columns = tbl.schema.names - start_out_of_range = end_out_of_range = False + is_below_initial_value = is_above_end_value = False if not tbl: # row is None or empty arrow table - return tbl, start_out_of_range, end_out_of_range + return tbl, is_below_initial_value, is_above_end_value if self.last_value_func is max: compute = pa.compute.max @@ -319,13 +319,13 @@ def __call__( tbl = tbl.filter(end_compare(tbl[cursor_path], end_value_scalar)) # Is max row value higher than end value? # NOTE: pyarrow bool *always* evaluates to python True. `as_py()` is necessary - end_out_of_range = not end_compare(row_value_scalar, end_value_scalar).as_py() + is_above_end_value = not end_compare(row_value_scalar, end_value_scalar).as_py() if self.start_value is not None: start_value_scalar = to_arrow_scalar(self.start_value, cursor_data_type) # Remove rows lower or equal than the last start value keep_filter = last_value_compare(tbl[cursor_path], start_value_scalar) - start_out_of_range = bool(pa.compute.any(pa.compute.invert(keep_filter)).as_py()) + is_below_initial_value = bool(pa.compute.any(pa.compute.invert(keep_filter)).as_py()) tbl = tbl.filter(keep_filter) if not self.deduplication_disabled: # Deduplicate after filtering old values @@ -373,11 +373,11 @@ def __call__( ) if len(tbl) == 0: - return None, start_out_of_range, end_out_of_range + return None, is_below_initial_value, is_above_end_value try: tbl = pyarrow.remove_columns(tbl, ["_dlt_index"]) except KeyError: pass if is_pandas: - return tbl.to_pandas(), start_out_of_range, end_out_of_range - return tbl, start_out_of_range, end_out_of_range + return tbl.to_pandas(), is_below_initial_value, is_above_end_value + return tbl, is_below_initial_value, is_above_end_value diff --git a/docs/examples/incremental_loading/incremental_loading.py b/docs/examples/incremental_loading/incremental_loading.py index f1de4eecfe..47294992c9 100644 --- a/docs/examples/incremental_loading/incremental_loading.py +++ b/docs/examples/incremental_loading/incremental_loading.py @@ -89,7 +89,7 @@ def ticket_events( yield page # stop loading when using end_value and end is reached. # unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves - if timestamp.end_out_of_range: + if timestamp.is_above_end_value: return return ticket_events diff --git a/docs/examples/qdrant_zendesk/qdrant_zendesk.py b/docs/examples/qdrant_zendesk/qdrant_zendesk.py index 9b6fbee150..ba5154924d 100644 --- a/docs/examples/qdrant_zendesk/qdrant_zendesk.py +++ b/docs/examples/qdrant_zendesk/qdrant_zendesk.py @@ -95,7 +95,7 @@ def tickets_data( # stop loading when using end_value and end is reached. # unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves - if updated_at.end_out_of_range: + if updated_at.is_above_end_value: return return tickets_data diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 17b7f6e3b2..1a4f749126 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -665,7 +665,7 @@ incremental and exit yield loop when true. ::: :::tip -The `dlt.sources.incremental` instance provides `start_out_of_range` and `end_out_of_range` +The `dlt.sources.incremental` instance provides `is_below_initial_value` and `is_above_end_value` attributes which are set when the resource yields an element with a higher/lower cursor value than the initial or end values. If you do not want `dlt` to stop processing automatically and instead to handle such events yourself, do not specify `row_order`: ```py @@ -684,7 +684,7 @@ def tickets( ): yield page # Stop loading when we reach the end value - if updated_at.end_out_of_range: + if updated_at.is_above_end_value: return ``` diff --git a/docs/website/docs/getting-started-snippets.py b/docs/website/docs/getting-started-snippets.py index eb00df9986..b9807fbe73 100644 --- a/docs/website/docs/getting-started-snippets.py +++ b/docs/website/docs/getting-started-snippets.py @@ -1,4 +1,3 @@ -import os from tests.pipeline.utils import assert_load_info @@ -159,7 +158,7 @@ def get_issues( # stop requesting pages if the last element was already older than initial value # note: incremental will skip those items anyway, we just do not want to use the api limits - if created_at.start_out_of_range: + if created_at.is_below_initial_value: break # get next page @@ -241,7 +240,7 @@ def repo_events(last_created_at=dlt.sources.incremental("created_at")): # stop requesting pages if the last element was already older than initial value # note: incremental will skip those items anyway, we just do not want to use the api limits - if last_created_at.start_out_of_range: + if last_created_at.is_below_initial_value: break # get next page diff --git a/docs/website/docs/tutorial/load-data-from-an-api-snippets.py b/docs/website/docs/tutorial/load-data-from-an-api-snippets.py index d53af9e3d9..b380229cca 100644 --- a/docs/website/docs/tutorial/load-data-from-an-api-snippets.py +++ b/docs/website/docs/tutorial/load-data-from-an-api-snippets.py @@ -1,4 +1,3 @@ -import os from tests.pipeline.utils import assert_load_info @@ -71,7 +70,7 @@ def get_issues( # older than initial value # Note: incremental will skip those items anyway, we just # do not want to use the api limits - if created_at.start_out_of_range: + if created_at.is_below_initial_value: break # get next page @@ -160,7 +159,7 @@ def repo_events(last_created_at=dlt.sources.incremental("created_at")): # stop requesting pages if the last element was already older than initial value # note: incremental will skip those items anyway, we just do not want to use the api limits - if last_created_at.start_out_of_range: + if last_created_at.is_below_initial_value: break # get next page diff --git a/docs/website/docs/walkthroughs/dispatch-to-multiple-tables.md b/docs/website/docs/walkthroughs/dispatch-to-multiple-tables.md index 0e342a3fea..5bd0e21337 100644 --- a/docs/website/docs/walkthroughs/dispatch-to-multiple-tables.md +++ b/docs/website/docs/walkthroughs/dispatch-to-multiple-tables.md @@ -38,7 +38,7 @@ def repo_events(last_created_at=dlt.sources.incremental("created_at")): # the initial value # note: incremental will skip those items anyway, we just do not # want to use the api limits - if last_created_at.start_out_of_range: + if last_created_at.is_below_initial_value: break # get next page diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index bdd867c65f..73966c0f1a 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -1696,7 +1696,7 @@ def custom_last_value(items): @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) def test_out_of_range_flags(item_type: TestDataItemFormat) -> None: - """Test incremental.start_out_of_range / end_out_of_range flags are set when items are filtered out""" + """Test incremental.is_below_initial_value / is_above_end_value flags are set when items are filtered out""" @dlt.resource def descending( @@ -1709,9 +1709,9 @@ def descending( yield data_to_item_format(item_type, data) # Assert flag is set only on the first item < initial_value if all(item > 9 for item in chunk): - assert updated_at.start_out_of_range is False + assert updated_at.is_below_initial_value is False else: - assert updated_at.start_out_of_range is True + assert updated_at.is_below_initial_value is True return @dlt.resource @@ -1725,9 +1725,9 @@ def ascending( yield data_to_item_format(item_type, data) # Flag is set only when end_value is reached if all(item < 45 for item in chunk): - assert updated_at.end_out_of_range is False + assert updated_at.is_above_end_value is False else: - assert updated_at.end_out_of_range is True + assert updated_at.is_above_end_value is True return @dlt.resource @@ -1740,9 +1740,9 @@ def descending_single_item( data = [{"updated_at": i}] yield from data_to_item_format(item_type, data) if i >= 10: - assert updated_at.start_out_of_range is False + assert updated_at.is_below_initial_value is False else: - assert updated_at.start_out_of_range is True + assert updated_at.is_below_initial_value is True return @dlt.resource @@ -1755,9 +1755,9 @@ def ascending_single_item( data = [{"updated_at": i}] yield from data_to_item_format(item_type, data) if i < 22: - assert updated_at.end_out_of_range is False + assert updated_at.is_above_end_value is False else: - assert updated_at.end_out_of_range is True + assert updated_at.is_above_end_value is True return pipeline = dlt.pipeline(pipeline_name="incremental_" + uniq_id(), destination="duckdb") From c1e765f900e3bad9874e351f98638c0905865d59 Mon Sep 17 00:00:00 2001 From: Willi Date: Mon, 22 Jul 2024 18:15:46 +0530 Subject: [PATCH 05/32] test that add_map can be used to transform items before the incremental function is called --- tests/extract/test_incremental.py | 62 +++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 73966c0f1a..cccf2be712 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -44,6 +44,8 @@ ALL_TEST_DATA_ITEM_FORMATS, ) +import pyarrow as pa + @pytest.fixture(autouse=True) def switch_to_fifo(): @@ -830,6 +832,66 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n assert data_item_length(values) == 3 +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_set_default_value_for_incremental_cursor(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1, "created_at": 1, "updated_at": 1}, + {"id": 2, "created_at": 4, "updated_at": None}, + {"id": 3, "created_at": 3, "updated_at": 3}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("updated_at")): + yield source_items + + def set_default_updated_at(record): + if record.get("updated_at") is None: + record["updated_at"] = record.get("created_at", pendulum.now().int_timestamp) + return record + + def set_default_updated_at_pandas(df): + df["updated_at"] = df["updated_at"].fillna(df["created_at"]) + return df + + def set_default_updated_at_arrow(records): + updated_at_is_null = pa.compute.is_null(records.column("updated_at")) + updated_at_filled = pa.compute.if_else( + updated_at_is_null, records.column("created_at"), records.column("updated_at") + ) + if item_type == "arrow-table": + records = records.set_column( + records.schema.get_field_index("updated_at"), + pa.field("updated_at", records.column("updated_at").type), + updated_at_filled, + ) + elif item_type == "arrow-batch": + columns = [records.column(i) for i in range(records.num_columns)] + columns[2] = updated_at_filled + records = pa.RecordBatch.from_arrays(columns, schema=records.schema) + return records + + if item_type == "object": + func = set_default_updated_at + elif item_type == "pandas": + func = set_default_updated_at_pandas + elif item_type in ["arrow-table", "arrow-batch"]: + func = set_default_updated_at_arrow + + result = list(some_data().add_map(func, insert_at=1)) + values = data_item_to_list(item_type, result) + assert data_item_length(values) == 3 + assert values[1]["updated_at"] == 4 + + # same for pipeline run + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data().add_map(func, insert_at=1)) + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "updated_at" + ] + assert s["last_value"] == 4 + + def test_json_path_cursor() -> None: @dlt.resource def some_data(last_timestamp=dlt.sources.incremental("item.timestamp|modifiedAt")): From 147772ec761a1bdafc4d3c7c40a0db979e632c21 Mon Sep 17 00:00:00 2001 From: Willi Date: Wed, 31 Jul 2024 14:14:40 +0530 Subject: [PATCH 06/32] Reverts renaming of start_out_of_range and end_out_of_range to is_below_initial_value and is_above_end_value This reverts commit 733d1c0c7070ca63a1c7727e0bdb5eb25dccd54e. --- dlt/extract/incremental/__init__.py | 10 +++++----- dlt/extract/incremental/transform.py | 16 ++++++++-------- .../incremental_loading/incremental_loading.py | 2 +- docs/examples/qdrant_zendesk/qdrant_zendesk.py | 2 +- .../docs/general-usage/incremental-loading.md | 4 ++-- docs/website/docs/getting-started-snippets.py | 5 +++-- .../tutorial/load-data-from-an-api-snippets.py | 5 +++-- .../dispatch-to-multiple-tables.md | 2 +- tests/extract/test_incremental.py | 18 +++++++++--------- 9 files changed, 33 insertions(+), 31 deletions(-) diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 7432778719..b4ff44f8e2 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -147,9 +147,9 @@ def __init__( """State dictionary cached on first access""" super().__init__(lambda x: x) # TODO: - self.is_above_end_value: bool = False + self.end_out_of_range: bool = False """Becomes true on the first item that is out of range of `end_value`. I.e. when using `max` function this means a value that is equal or higher""" - self.is_below_initial_value: bool = False + self.start_out_of_range: bool = False """Becomes true on the first item that is out of range of `start_value`. I.e. when using `max` this is a value that is lower than `start_value`""" self._transformers: Dict[str, IncrementalTransform] = {} @@ -336,7 +336,7 @@ def last_value(self) -> Optional[TCursorValue]: def _transform_item( self, transformer: IncrementalTransform, row: TDataItem ) -> Optional[TDataItem]: - row, self.is_below_initial_value, self.is_above_end_value = transformer(row) + row, self.start_out_of_range, self.end_out_of_range = transformer(row) # if we know that rows are ordered we can close the generator automatically # mind that closing pipe will not immediately close processing. it only closes the # generator so this page will be fully processed @@ -451,9 +451,9 @@ def can_close(self) -> bool: # ordered ascending, check if we cross upper bound return ( self.row_order == "asc" - and self.is_above_end_value + and self.end_out_of_range or self.row_order == "desc" - and self.is_below_initial_value + and self.start_out_of_range ) def __str__(self) -> str: diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index a1c2042066..7acbb9ef29 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -150,7 +150,7 @@ def __call__( ) -> Tuple[Optional[TDataItem], bool, bool]: """ Returns: - Tuple (row, is_below_initial_value, is_above_end_value) where row is either the data item or `None` if it is completely filtered out + Tuple (row, start_out_of_range, end_out_of_range) where row is either the data item or `None` if it is completely filtered out """ if row is None: return row, False, False @@ -276,9 +276,9 @@ def __call__( elif primary_key is None: unique_columns = tbl.schema.names - is_below_initial_value = is_above_end_value = False + start_out_of_range = end_out_of_range = False if not tbl: # row is None or empty arrow table - return tbl, is_below_initial_value, is_above_end_value + return tbl, start_out_of_range, end_out_of_range if self.last_value_func is max: compute = pa.compute.max @@ -319,13 +319,13 @@ def __call__( tbl = tbl.filter(end_compare(tbl[cursor_path], end_value_scalar)) # Is max row value higher than end value? # NOTE: pyarrow bool *always* evaluates to python True. `as_py()` is necessary - is_above_end_value = not end_compare(row_value_scalar, end_value_scalar).as_py() + end_out_of_range = not end_compare(row_value_scalar, end_value_scalar).as_py() if self.start_value is not None: start_value_scalar = to_arrow_scalar(self.start_value, cursor_data_type) # Remove rows lower or equal than the last start value keep_filter = last_value_compare(tbl[cursor_path], start_value_scalar) - is_below_initial_value = bool(pa.compute.any(pa.compute.invert(keep_filter)).as_py()) + start_out_of_range = bool(pa.compute.any(pa.compute.invert(keep_filter)).as_py()) tbl = tbl.filter(keep_filter) if not self.deduplication_disabled: # Deduplicate after filtering old values @@ -373,11 +373,11 @@ def __call__( ) if len(tbl) == 0: - return None, is_below_initial_value, is_above_end_value + return None, start_out_of_range, end_out_of_range try: tbl = pyarrow.remove_columns(tbl, ["_dlt_index"]) except KeyError: pass if is_pandas: - return tbl.to_pandas(), is_below_initial_value, is_above_end_value - return tbl, is_below_initial_value, is_above_end_value + return tbl.to_pandas(), start_out_of_range, end_out_of_range + return tbl, start_out_of_range, end_out_of_range diff --git a/docs/examples/incremental_loading/incremental_loading.py b/docs/examples/incremental_loading/incremental_loading.py index 47294992c9..f1de4eecfe 100644 --- a/docs/examples/incremental_loading/incremental_loading.py +++ b/docs/examples/incremental_loading/incremental_loading.py @@ -89,7 +89,7 @@ def ticket_events( yield page # stop loading when using end_value and end is reached. # unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves - if timestamp.is_above_end_value: + if timestamp.end_out_of_range: return return ticket_events diff --git a/docs/examples/qdrant_zendesk/qdrant_zendesk.py b/docs/examples/qdrant_zendesk/qdrant_zendesk.py index ba5154924d..9b6fbee150 100644 --- a/docs/examples/qdrant_zendesk/qdrant_zendesk.py +++ b/docs/examples/qdrant_zendesk/qdrant_zendesk.py @@ -95,7 +95,7 @@ def tickets_data( # stop loading when using end_value and end is reached. # unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves - if updated_at.is_above_end_value: + if updated_at.end_out_of_range: return return tickets_data diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 1a4f749126..17b7f6e3b2 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -665,7 +665,7 @@ incremental and exit yield loop when true. ::: :::tip -The `dlt.sources.incremental` instance provides `is_below_initial_value` and `is_above_end_value` +The `dlt.sources.incremental` instance provides `start_out_of_range` and `end_out_of_range` attributes which are set when the resource yields an element with a higher/lower cursor value than the initial or end values. If you do not want `dlt` to stop processing automatically and instead to handle such events yourself, do not specify `row_order`: ```py @@ -684,7 +684,7 @@ def tickets( ): yield page # Stop loading when we reach the end value - if updated_at.is_above_end_value: + if updated_at.end_out_of_range: return ``` diff --git a/docs/website/docs/getting-started-snippets.py b/docs/website/docs/getting-started-snippets.py index b9807fbe73..eb00df9986 100644 --- a/docs/website/docs/getting-started-snippets.py +++ b/docs/website/docs/getting-started-snippets.py @@ -1,3 +1,4 @@ +import os from tests.pipeline.utils import assert_load_info @@ -158,7 +159,7 @@ def get_issues( # stop requesting pages if the last element was already older than initial value # note: incremental will skip those items anyway, we just do not want to use the api limits - if created_at.is_below_initial_value: + if created_at.start_out_of_range: break # get next page @@ -240,7 +241,7 @@ def repo_events(last_created_at=dlt.sources.incremental("created_at")): # stop requesting pages if the last element was already older than initial value # note: incremental will skip those items anyway, we just do not want to use the api limits - if last_created_at.is_below_initial_value: + if last_created_at.start_out_of_range: break # get next page diff --git a/docs/website/docs/tutorial/load-data-from-an-api-snippets.py b/docs/website/docs/tutorial/load-data-from-an-api-snippets.py index b380229cca..d53af9e3d9 100644 --- a/docs/website/docs/tutorial/load-data-from-an-api-snippets.py +++ b/docs/website/docs/tutorial/load-data-from-an-api-snippets.py @@ -1,3 +1,4 @@ +import os from tests.pipeline.utils import assert_load_info @@ -70,7 +71,7 @@ def get_issues( # older than initial value # Note: incremental will skip those items anyway, we just # do not want to use the api limits - if created_at.is_below_initial_value: + if created_at.start_out_of_range: break # get next page @@ -159,7 +160,7 @@ def repo_events(last_created_at=dlt.sources.incremental("created_at")): # stop requesting pages if the last element was already older than initial value # note: incremental will skip those items anyway, we just do not want to use the api limits - if last_created_at.is_below_initial_value: + if last_created_at.start_out_of_range: break # get next page diff --git a/docs/website/docs/walkthroughs/dispatch-to-multiple-tables.md b/docs/website/docs/walkthroughs/dispatch-to-multiple-tables.md index 5bd0e21337..0e342a3fea 100644 --- a/docs/website/docs/walkthroughs/dispatch-to-multiple-tables.md +++ b/docs/website/docs/walkthroughs/dispatch-to-multiple-tables.md @@ -38,7 +38,7 @@ def repo_events(last_created_at=dlt.sources.incremental("created_at")): # the initial value # note: incremental will skip those items anyway, we just do not # want to use the api limits - if last_created_at.is_below_initial_value: + if last_created_at.start_out_of_range: break # get next page diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index cccf2be712..55dd96b8dc 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -1758,7 +1758,7 @@ def custom_last_value(items): @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) def test_out_of_range_flags(item_type: TestDataItemFormat) -> None: - """Test incremental.is_below_initial_value / is_above_end_value flags are set when items are filtered out""" + """Test incremental.start_out_of_range / end_out_of_range flags are set when items are filtered out""" @dlt.resource def descending( @@ -1771,9 +1771,9 @@ def descending( yield data_to_item_format(item_type, data) # Assert flag is set only on the first item < initial_value if all(item > 9 for item in chunk): - assert updated_at.is_below_initial_value is False + assert updated_at.start_out_of_range is False else: - assert updated_at.is_below_initial_value is True + assert updated_at.start_out_of_range is True return @dlt.resource @@ -1787,9 +1787,9 @@ def ascending( yield data_to_item_format(item_type, data) # Flag is set only when end_value is reached if all(item < 45 for item in chunk): - assert updated_at.is_above_end_value is False + assert updated_at.end_out_of_range is False else: - assert updated_at.is_above_end_value is True + assert updated_at.end_out_of_range is True return @dlt.resource @@ -1802,9 +1802,9 @@ def descending_single_item( data = [{"updated_at": i}] yield from data_to_item_format(item_type, data) if i >= 10: - assert updated_at.is_below_initial_value is False + assert updated_at.start_out_of_range is False else: - assert updated_at.is_below_initial_value is True + assert updated_at.start_out_of_range is True return @dlt.resource @@ -1817,9 +1817,9 @@ def ascending_single_item( data = [{"updated_at": i}] yield from data_to_item_format(item_type, data) if i < 22: - assert updated_at.is_above_end_value is False + assert updated_at.end_out_of_range is False else: - assert updated_at.is_above_end_value is True + assert updated_at.end_out_of_range is True return pipeline = dlt.pipeline(pipeline_name="incremental_" + uniq_id(), destination="duckdb") From 36957fa2783c48aa4ea450ba0eb8b2e9281e122a Mon Sep 17 00:00:00 2001 From: Willi Date: Wed, 31 Jul 2024 15:30:56 +0530 Subject: [PATCH 07/32] Documents handling NULL values at incremental cursor (cherry picked from commit 95db6c729d08062355237f3251b6fa6b60b74826 and 6d682da3995295f5c4fb6e6f651d73c363b860ee) incorporates docs changes from review --- .../docs/general-usage/incremental-loading.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 17b7f6e3b2..431cc3532d 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -894,6 +894,19 @@ def some_data(updated_at=dlt.sources.incremental("updated_at")): list(some_data()) ``` +If you want to load data that includes `None` values there are two options: + +1. Transform the values at the incremental cursor to a value different from `None` before the incremental object is called. [See docs below](#transform-records-before-incremental-processing) +2. Configure the incremental load to tolerate `None` values using `incremental(..., on_cursor_value_none="include")`. + +Example: +```py +@dlt.resource + +list(some_data()) +``` + +### Transform records before incremental processing If you want to load data that includes `None` values you can transform the records before the incremental processing. You can add steps to the pipeline that [filter, transform, or pivot your data](../general-usage/resource.md#filter-transform-and-pivot-data). From bd46dcf693bcbf3523ed3399a18fe69eed0a2e52 Mon Sep 17 00:00:00 2001 From: Willi Date: Thu, 1 Aug 2024 16:07:48 +0530 Subject: [PATCH 08/32] tests that even records with None at cursor path are loaded --- tests/extract/test_incremental.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 55dd96b8dc..7f607ca725 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -639,7 +639,7 @@ def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")): @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_updates_incremental_cursor_1(item_type: TestDataItemFormat) -> None: +def test_cursor_path_none_includes_records_and_updates_incremental_cursor_1(item_type: TestDataItemFormat) -> None: data = [ {"id": 1, "created_at": None}, {"id": 2, "created_at": 1}, @@ -652,7 +652,11 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) - p.extract(some_data()) + extract_info = p.extract(some_data()) + load_id = extract_info.loads_ids[0] + metrics = extract_info.metrics[load_id][0]["table_metrics"]["some_data"] + assert metrics.items_count == 3 + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ "created_at" ] @@ -660,7 +664,7 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_updates_incremental_cursor_2(item_type: TestDataItemFormat) -> None: +def test_cursor_path_none_includes_records_and_updates_incremental_cursor_2(item_type: TestDataItemFormat) -> None: data = [ {"id": 1, "created_at": 1}, {"id": 2, "created_at": None}, @@ -673,7 +677,11 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) - p.extract(some_data()) + extract_info = p.extract(some_data()) + load_id = extract_info.loads_ids[0] + metrics = extract_info.metrics[load_id][0]["table_metrics"]["some_data"] + assert metrics.items_count == 3 + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ "created_at" ] @@ -681,7 +689,7 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_updates_incremental_cursor_3(item_type: TestDataItemFormat) -> None: +def test_cursor_path_none_includes_records_and_updates_incremental_cursor_3(item_type: TestDataItemFormat) -> None: data = [ {"id": 1, "created_at": 1}, {"id": 2, "created_at": 2}, @@ -694,7 +702,11 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) - p.extract(some_data()) + extract_info = p.extract(some_data()) + load_id = extract_info.loads_ids[0] + metrics = extract_info.metrics[load_id][0]["table_metrics"]["some_data"] + assert metrics.items_count == 3 + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ "created_at" ] From a822499f84c4c319c3cd9b815fda18c098433487 Mon Sep 17 00:00:00 2001 From: Willi Date: Thu, 1 Aug 2024 16:42:02 +0530 Subject: [PATCH 09/32] tests with full pipeline run instead of only extraction with table metrics --- tests/extract/test_incremental.py | 136 +++++++++++++----------------- 1 file changed, 59 insertions(+), 77 deletions(-) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 7f607ca725..3c6ddfb185 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -44,6 +44,8 @@ ALL_TEST_DATA_ITEM_FORMATS, ) +from tests.pipeline.utils import assert_query_data + import pyarrow as pa @@ -639,7 +641,9 @@ def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")): @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_includes_records_and_updates_incremental_cursor_1(item_type: TestDataItemFormat) -> None: +def test_cursor_path_none_includes_records_and_updates_incremental_cursor_1( + item_type: TestDataItemFormat, +) -> None: data = [ {"id": 1, "created_at": None}, {"id": 2, "created_at": 1}, @@ -652,10 +656,9 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) - extract_info = p.extract(some_data()) - load_id = extract_info.loads_ids[0] - metrics = extract_info.metrics[load_id][0]["table_metrics"]["some_data"] - assert metrics.items_count == 3 + p.run(some_data(), destination="duckdb") + + assert_query_data(p, "select count(id) from some_data", [3]) s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ "created_at" @@ -664,7 +667,9 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_includes_records_and_updates_incremental_cursor_2(item_type: TestDataItemFormat) -> None: +def test_cursor_path_none_includes_records_and_updates_incremental_cursor_2( + item_type: TestDataItemFormat, +) -> None: data = [ {"id": 1, "created_at": 1}, {"id": 2, "created_at": None}, @@ -677,10 +682,9 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) - extract_info = p.extract(some_data()) - load_id = extract_info.loads_ids[0] - metrics = extract_info.metrics[load_id][0]["table_metrics"]["some_data"] - assert metrics.items_count == 3 + p.run(some_data(), destination="duckdb") + + assert_query_data(p, "select count(id) from some_data", [3]) s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ "created_at" @@ -689,7 +693,9 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_includes_records_and_updates_incremental_cursor_3(item_type: TestDataItemFormat) -> None: +def test_cursor_path_none_includes_records_and_updates_incremental_cursor_3( + item_type: TestDataItemFormat, +) -> None: data = [ {"id": 1, "created_at": 1}, {"id": 2, "created_at": 2}, @@ -702,10 +708,8 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) - extract_info = p.extract(some_data()) - load_id = extract_info.loads_ids[0] - metrics = extract_info.metrics[load_id][0]["table_metrics"]["some_data"] - assert metrics.items_count == 3 + p.run(some_data(), destination="duckdb") + assert_query_data(p, "select count(id) from some_data", [3]) s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ "created_at" @@ -714,7 +718,7 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n def test_cursor_path_none_can_raise_on_none() -> None: - # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + # No None support for pandas and arrow yet source_items = [ {"id": 1, "created_at": 1}, {"id": 2, "created_at": None}, @@ -772,10 +776,25 @@ def some_data( "data.items[*].created_at", on_cursor_value_none="include" ) ): - yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} + yield { + "data": { + "items": [ + {"created_at": None}, + {"created_at": 1}, + ] + } + } results = list(some_data()) - assert results[0]["data"]["items"] == [{"created_at": None}, {"created_at": 1}] + assert results[0]["data"]["items"] == [ + {"created_at": None}, + {"created_at": 1}, + ] + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") + + assert_query_data(p, "select count(*) from some_data__data__items", [2]) def test_cursor_path_none_nested_can_include_on_none_2() -> None: @@ -786,76 +805,39 @@ def some_data( "data.items[0].created_at", on_cursor_value_none="include" ) ): - yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} + yield { + "data": { + "items": [ + {"created_at": None}, + {"created_at": 1}, + ] + } + } results = list(some_data()) - assert results[0]["data"]["items"] == [{"created_at": None}, {"created_at": 1}] - - -@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_can_include_on_none_1(item_type: TestDataItemFormat) -> None: - data = [ - {"id": 1, "created_at": 1}, - {"id": 2, "created_at": None}, - {"id": 3, "created_at": 2}, - ] - source_items = data_to_item_format(item_type, data) - - @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): - yield source_items - - result = data_item_to_list(item_type, list(some_data())) - assert data_item_length(result) == 3 - - -@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_can_include_on_none_2(item_type: TestDataItemFormat) -> None: - data = [ - {"id": 1, "created_at": None}, - {"id": 2, "created_at": 1}, - {"id": 3, "created_at": 2}, - ] - source_items = data_to_item_format(item_type, data) - - @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): - yield source_items - - result = data_item_to_list(item_type, list(some_data())) - assert data_item_length(result) == 3 - - -@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_can_include_on_none_3(item_type: TestDataItemFormat) -> None: - data = [ - {"id": 1, "created_at": 1}, - {"id": 2, "created_at": 1}, - {"id": 3, "created_at": None}, + assert results[0]["data"]["items"] == [ + {"created_at": None}, + {"created_at": 1}, ] - source_items = data_to_item_format(item_type, data) - @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): - yield source_items + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") - result = list(some_data()) - values = data_item_to_list(item_type, result) - assert data_item_length(values) == 3 + assert_query_data(p, "select count(*) from some_data__data__items", [2]) @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) def test_set_default_value_for_incremental_cursor(item_type: TestDataItemFormat) -> None: - data = [ - {"id": 1, "created_at": 1, "updated_at": 1}, - {"id": 2, "created_at": 4, "updated_at": None}, - {"id": 3, "created_at": 3, "updated_at": 3}, - ] - source_items = data_to_item_format(item_type, data) - @dlt.resource def some_data(created_at=dlt.sources.incremental("updated_at")): - yield source_items + yield data_to_item_format( + item_type, + [ + {"id": 1, "created_at": 1, "updated_at": 1}, + {"id": 2, "created_at": 4, "updated_at": None}, + {"id": 3, "created_at": 3, "updated_at": 3}, + ], + ) def set_default_updated_at(record): if record.get("updated_at") is None: From 89118b0856957a214c2bd9a784d5a8d04db74006 Mon Sep 17 00:00:00 2001 From: Willi Date: Thu, 1 Aug 2024 18:13:37 +0530 Subject: [PATCH 10/32] Allows exclusion of None values for python Objects, not Pandas or Arrows --- dlt/extract/incremental/__init__.py | 2 +- dlt/extract/incremental/transform.py | 7 ++++++- dlt/extract/incremental/typing.py | 2 +- tests/extract/test_incremental.py | 25 +++++++++++++++++++++++++ 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index b4ff44f8e2..a4f57142d3 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -100,7 +100,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class. The values passed explicitly to Incremental will be ignored. Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded - on_cursor_value_none: Specify what happens when a record has `None` at the cursor_path: raise, include + on_cursor_value_none: Specify what happens when a record has `None` at the cursor_path: raise, include, exclude """ # this is config/dataclass so declare members diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 7acbb9ef29..7225b7b57b 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -120,7 +120,9 @@ def find_cursor_value(self, row: TDataItem) -> Any: Will use compiled JSONPath if present, otherwise it reverts to column search if row is dict """ - if self.cursor_path not in row.keys() and not self._compiled_cursor_path: + if not self._compiled_cursor_path and self.cursor_path not in row.keys(): + if self.on_cursor_value_none == "exclude": + return None raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) row_value = self._value_at_cursor_path(row) @@ -156,6 +158,9 @@ def __call__( return row, False, False row_value = self.find_cursor_value(row) + if row_value is None and self.on_cursor_value_none == "exclude": + return None, False, False + last_value = self.last_value last_value_func = self.last_value_func diff --git a/dlt/extract/incremental/typing.py b/dlt/extract/incremental/typing.py index 14a1435a8a..368c9e10b8 100644 --- a/dlt/extract/incremental/typing.py +++ b/dlt/extract/incremental/typing.py @@ -3,7 +3,7 @@ TCursorValue = TypeVar("TCursorValue", bound=Any) LastValueFunc = Callable[[Sequence[TCursorValue]], Any] -OnCursorValueNone = Literal["raise", "include"] +OnCursorValueNone = Literal["raise", "include", "exclude"] class IncrementalColumnState(TypedDict): diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 3c6ddfb185..9bf569e537 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -717,6 +717,31 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n assert s["last_value"] == 2 +@pytest.mark.parametrize("item_type", ["object"]) +def test_cursor_path_none_excludes_records_and_updates_incremental_cursor( + item_type: TestDataItemFormat, +) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2, "created_at": 2}, + {"id": 3, "created_at": None}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="exclude")): + yield source_items + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") + assert_query_data(p, "select count(id) from some_data", [2]) + + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 2 + + def test_cursor_path_none_can_raise_on_none() -> None: # No None support for pandas and arrow yet source_items = [ From d09436b67239ead7868a2f3077eb1cd274625083 Mon Sep 17 00:00:00 2001 From: Willi Date: Thu, 1 Aug 2024 18:52:32 +0530 Subject: [PATCH 11/32] refactors for speed and readability --- dlt/extract/incremental/transform.py | 55 +++++++++++++++++----------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 7225b7b57b..b52bc8478a 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -120,32 +120,43 @@ def find_cursor_value(self, row: TDataItem) -> Any: Will use compiled JSONPath if present, otherwise it reverts to column search if row is dict """ - if not self._compiled_cursor_path and self.cursor_path not in row.keys(): - if self.on_cursor_value_none == "exclude": - return None - raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) - - row_value = self._value_at_cursor_path(row) - - if self.on_cursor_value_none == "raise" and row_value is None: - raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) - - return row_value - - def _value_at_cursor_path(self, row: TDataItem) -> Any: - row_value = row.get(self.cursor_path, None) - if self._compiled_cursor_path: cursor_values = find_values(self._compiled_cursor_path, row) - if cursor_values == [] and self.on_cursor_value_none == "raise": - raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) - elif None in cursor_values and self.on_cursor_value_none == "raise": - raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) - else: - # ignores the other found values, e.g. when the path is $data.items[*].created_at - row_value = cursor_values[0] + if self.on_cursor_value_none == "raise": + if cursor_values == []: + raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) + elif None in cursor_values: + raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) + elif self.on_cursor_value_none == "include": + # TODO: decide if we also want to raise if the field is not present + if cursor_values == []: + raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) + elif self.on_cursor_value_none == "exclude": + if cursor_values == [] or None in cursor_values: + return None + + # ignores the other found values, e.g. when the path is $data.items[*].created_at + row_value = cursor_values[0] + else: + row_value = row.get(self.cursor_path) + if row_value is None: + if self.on_cursor_value_none == "raise": + if self.cursor_path not in row.keys(): + raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) + else: + raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) + if self.on_cursor_value_none == "exclude": + return None + if self.on_cursor_value_none == "include": + # TODO: decide if we also want to raise if the field is not present + if self.cursor_path not in row.keys(): + raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) + else: + return row_value + return row_value + def __call__( self, row: TDataItem, From e282c9f34ccf4452a00a463749da3b5d47a967ab Mon Sep 17 00:00:00 2001 From: Willi Date: Fri, 2 Aug 2024 16:00:45 +0530 Subject: [PATCH 12/32] shortens implementation --- dlt/extract/incremental/__init__.py | 2 ++ dlt/extract/incremental/transform.py | 22 +++++++++------------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index a4f57142d3..34a7030ff1 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -141,6 +141,8 @@ def __init__( self._primary_key: Optional[TTableHintTemplate[TColumnNames]] = primary_key self.row_order = row_order self.allow_external_schedulers = allow_external_schedulers + if on_cursor_value_none not in ["raise", "include", "exclude"]: + raise ValueError(f"Unexpected argument for on_cursor_value_none. Got {on_cursor_value_none}") self.on_cursor_value_none = on_cursor_value_none self._cached_state: IncrementalColumnState = None diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index b52bc8478a..97cea6829e 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -122,17 +122,15 @@ def find_cursor_value(self, row: TDataItem) -> Any: """ if self._compiled_cursor_path: cursor_values = find_values(self._compiled_cursor_path, row) - if self.on_cursor_value_none == "raise": - if cursor_values == []: + if cursor_values == []: + if self.on_cursor_value_none == "raise": raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) - elif None in cursor_values: + elif self.on_cursor_value_none == "exclude": + return None + elif None in cursor_values: + if self.on_cursor_value_none == "raise": raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) - elif self.on_cursor_value_none == "include": - # TODO: decide if we also want to raise if the field is not present - if cursor_values == []: - raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) - elif self.on_cursor_value_none == "exclude": - if cursor_values == [] or None in cursor_values: + elif self.on_cursor_value_none == "exclude": return None # ignores the other found values, e.g. when the path is $data.items[*].created_at @@ -145,14 +143,12 @@ def find_cursor_value(self, row: TDataItem) -> Any: raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) else: raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) - if self.on_cursor_value_none == "exclude": + elif self.on_cursor_value_none == "exclude": return None - if self.on_cursor_value_none == "include": + elif self.on_cursor_value_none == "include": # TODO: decide if we also want to raise if the field is not present if self.cursor_path not in row.keys(): raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) - else: - return row_value return row_value From 30e3f1f9fd17c5682700608491f4d7ef53943220 Mon Sep 17 00:00:00 2001 From: Willi Date: Fri, 2 Aug 2024 16:15:02 +0530 Subject: [PATCH 13/32] suggestions from code review --- dlt/extract/incremental/transform.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 97cea6829e..e63adbeed8 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -165,8 +165,8 @@ def __call__( return row, False, False row_value = self.find_cursor_value(row) - if row_value is None and self.on_cursor_value_none == "exclude": - return None, False, False + if row_value is None: + return row, False, False last_value = self.last_value last_value_func = self.last_value_func @@ -189,22 +189,14 @@ def __call__( ): return None, False, True - if (row_value is None and last_value is None) or ( - row_value is None and self.start_value is None - ): - # store rows with "max" values to compute hashes after processing full batch - self.last_rows = [row] - self.unique_hashes = set() - return row, False, False - row_value_ignored_none = (row_value,) if row_value is not None else () - check_values = (row_value_ignored_none) + ((last_value,) if last_value is not None else ()) + check_values = (row_value,) + ((last_value,) if last_value is not None else ()) new_value = last_value_func(check_values) # new_value is "less" or equal to last_value (the actual max) if last_value == new_value: # use func to compute row_value into last_value compatible processed_row_value = last_value_func((row_value,)) # skip the record that is not a start_value or new_value: that record was already processed - check_values = row_value_ignored_none + ( + check_values = (row_value,) + ( (self.start_value,) if self.start_value is not None else () ) new_value = last_value_func(check_values) From 45b7635dec825666e8d3a1badc4db1d6da377c37 Mon Sep 17 00:00:00 2001 From: Willi Date: Fri, 2 Aug 2024 16:43:15 +0530 Subject: [PATCH 14/32] fixes exclusion of rows with None cursor values --- dlt/extract/incremental/transform.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index e63adbeed8..d850e31d9a 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -166,7 +166,10 @@ def __call__( row_value = self.find_cursor_value(row) if row_value is None: - return row, False, False + if self.on_cursor_value_none == "exclude": + return None, False, False + else: + return row, False, False last_value = self.last_value last_value_func = self.last_value_func From 00263fff068a151a7e13a51e37b62f67a9938ac9 Mon Sep 17 00:00:00 2001 From: Willi Date: Fri, 2 Aug 2024 16:55:37 +0530 Subject: [PATCH 15/32] `include` includes also rows where cursor_path is missing --- dlt/extract/incremental/transform.py | 4 --- tests/extract/test_incremental.py | 50 ++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index d850e31d9a..8f3ae49d67 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -145,10 +145,6 @@ def find_cursor_value(self, row: TDataItem) -> Any: raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) elif self.on_cursor_value_none == "exclude": return None - elif self.on_cursor_value_none == "include": - # TODO: decide if we also want to raise if the field is not present - if self.cursor_path not in row.keys(): - raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) return row_value diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 9bf569e537..42bda5cd53 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -716,6 +716,28 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_n ] assert s["last_value"] == 2 +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_includes_records_without_cursor_path( + item_type: TestDataItemFormat, +) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): + yield source_items + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") + assert_query_data(p, "select count(id) from some_data", [2]) + + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 1 @pytest.mark.parametrize("item_type", ["object"]) def test_cursor_path_none_excludes_records_and_updates_incremental_cursor( @@ -851,6 +873,34 @@ def some_data( assert_query_data(p, "select count(*) from some_data__data__items", [2]) +def test_cursor_path_none_nested_includes_rows_without_cursor_path() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental( + "data.items[*].created_at", on_cursor_value_none="include" + ) + ): + yield { + "data": { + "items": [ + {"id": 1}, + {"id": 2, "created_at": 2}, + ] + } + } + + results = list(some_data()) + assert results[0]["data"]["items"] == [ + {"id": 1}, + {"id": 2, "created_at": 2}, + ] + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") + + assert_query_data(p, "select count(*) from some_data__data__items", [2]) + @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) def test_set_default_value_for_incremental_cursor(item_type: TestDataItemFormat) -> None: @dlt.resource From a51675418c2bcfe0aabf2a97b1aab21617dc6b0b Mon Sep 17 00:00:00 2001 From: Willi Date: Fri, 2 Aug 2024 17:03:53 +0530 Subject: [PATCH 16/32] Clarifies that value at cursor_path and cursor_path not in records are treated equally --- dlt/extract/incremental/__init__.py | 16 ++++++++-------- dlt/extract/incremental/transform.py | 20 ++++++++++---------- dlt/extract/incremental/typing.py | 2 +- tests/extract/test_incremental.py | 22 +++++++++++----------- 4 files changed, 30 insertions(+), 30 deletions(-) diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 34a7030ff1..6003f5e5db 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -39,7 +39,7 @@ IncrementalColumnState, TCursorValue, LastValueFunc, - OnCursorValueNone, + OnCursorValueMissing, ) from dlt.extract.pipe import Pipe from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform @@ -100,7 +100,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class. The values passed explicitly to Incremental will be ignored. Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded - on_cursor_value_none: Specify what happens when a record has `None` at the cursor_path: raise, include, exclude + on_cursor_value_missing: Specify what happens when the cursor_path does not exist in a record or a record has `None` at the cursor_path: raise, include, exclude """ # this is config/dataclass so declare members @@ -110,7 +110,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa end_value: Optional[Any] = None row_order: Optional[TSortOrder] = None allow_external_schedulers: bool = False - on_cursor_value_none: OnCursorValueNone = "raise" + on_cursor_value_missing: OnCursorValueMissing = "raise" # incremental acting as empty EMPTY: ClassVar["Incremental[Any]"] = None @@ -125,7 +125,7 @@ def __init__( end_value: Optional[TCursorValue] = None, row_order: Optional[TSortOrder] = None, allow_external_schedulers: bool = False, - on_cursor_value_none: OnCursorValueNone = "raise", + on_cursor_value_missing: OnCursorValueMissing = "raise", ) -> None: # make sure that path is valid if cursor_path: @@ -141,9 +141,9 @@ def __init__( self._primary_key: Optional[TTableHintTemplate[TColumnNames]] = primary_key self.row_order = row_order self.allow_external_schedulers = allow_external_schedulers - if on_cursor_value_none not in ["raise", "include", "exclude"]: - raise ValueError(f"Unexpected argument for on_cursor_value_none. Got {on_cursor_value_none}") - self.on_cursor_value_none = on_cursor_value_none + if on_cursor_value_missing not in ["raise", "include", "exclude"]: + raise ValueError(f"Unexpected argument for on_cursor_value_none. Got {on_cursor_value_missing}") + self.on_cursor_value_missing = on_cursor_value_missing self._cached_state: IncrementalColumnState = None """State dictionary cached on first access""" @@ -182,7 +182,7 @@ def _make_transforms(self) -> None: self.last_value_func, self._primary_key, set(self._cached_state["unique_hashes"]), - self.on_cursor_value_none, + self.on_cursor_value_missing, ) @classmethod diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 8f3ae49d67..b145bfc04e 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -12,7 +12,7 @@ IncrementalPrimaryKeyMissing, IncrementalCursorPathHasValueNone, ) -from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueNone +from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing from dlt.extract.utils import resolve_column_value from dlt.extract.items import TTableHintTemplate from dlt.common.schema.typing import TColumnNames @@ -55,7 +55,7 @@ def __init__( last_value_func: LastValueFunc[TCursorValue], primary_key: Optional[TTableHintTemplate[TColumnNames]], unique_hashes: Set[str], - on_cursor_value_none: OnCursorValueNone = "raise", + on_cursor_value_missing: OnCursorValueMissing = "raise", ) -> None: self.resource_name = resource_name self.cursor_path = cursor_path @@ -68,7 +68,7 @@ def __init__( self.primary_key = primary_key self.unique_hashes = unique_hashes self.start_unique_hashes = set(unique_hashes) - self.on_cursor_value_none = on_cursor_value_none + self.on_cursor_value_missing = on_cursor_value_missing # compile jsonpath self._compiled_cursor_path = compile_path(cursor_path) @@ -123,14 +123,14 @@ def find_cursor_value(self, row: TDataItem) -> Any: if self._compiled_cursor_path: cursor_values = find_values(self._compiled_cursor_path, row) if cursor_values == []: - if self.on_cursor_value_none == "raise": + if self.on_cursor_value_missing == "raise": raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) - elif self.on_cursor_value_none == "exclude": + elif self.on_cursor_value_missing == "exclude": return None elif None in cursor_values: - if self.on_cursor_value_none == "raise": + if self.on_cursor_value_missing == "raise": raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) - elif self.on_cursor_value_none == "exclude": + elif self.on_cursor_value_missing == "exclude": return None # ignores the other found values, e.g. when the path is $data.items[*].created_at @@ -138,12 +138,12 @@ def find_cursor_value(self, row: TDataItem) -> Any: else: row_value = row.get(self.cursor_path) if row_value is None: - if self.on_cursor_value_none == "raise": + if self.on_cursor_value_missing == "raise": if self.cursor_path not in row.keys(): raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) else: raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) - elif self.on_cursor_value_none == "exclude": + elif self.on_cursor_value_missing == "exclude": return None return row_value @@ -162,7 +162,7 @@ def __call__( row_value = self.find_cursor_value(row) if row_value is None: - if self.on_cursor_value_none == "exclude": + if self.on_cursor_value_missing == "exclude": return None, False, False else: return row, False, False diff --git a/dlt/extract/incremental/typing.py b/dlt/extract/incremental/typing.py index 368c9e10b8..a5e2612db4 100644 --- a/dlt/extract/incremental/typing.py +++ b/dlt/extract/incremental/typing.py @@ -3,7 +3,7 @@ TCursorValue = TypeVar("TCursorValue", bound=Any) LastValueFunc = Callable[[Sequence[TCursorValue]], Any] -OnCursorValueNone = Literal["raise", "include", "exclude"] +OnCursorValueMissing = Literal["raise", "include", "exclude"] class IncrementalColumnState(TypedDict): diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 42bda5cd53..76e8f8e0d2 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -652,7 +652,7 @@ def test_cursor_path_none_includes_records_and_updates_incremental_cursor_1( source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) @@ -678,7 +678,7 @@ def test_cursor_path_none_includes_records_and_updates_incremental_cursor_2( source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) @@ -704,7 +704,7 @@ def test_cursor_path_none_includes_records_and_updates_incremental_cursor_3( source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) @@ -727,7 +727,7 @@ def test_cursor_path_none_includes_records_without_cursor_path( source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) @@ -751,7 +751,7 @@ def test_cursor_path_none_excludes_records_and_updates_incremental_cursor( source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="exclude")): + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="exclude")): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) @@ -773,7 +773,7 @@ def test_cursor_path_none_can_raise_on_none() -> None: ] @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="raise")): + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise")): yield source_items with pytest.raises(IncrementalCursorPathHasValueNone) as py_ex: @@ -793,7 +793,7 @@ def test_cursor_path_none_nested_can_raise_on_none_1() -> None: # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails @dlt.resource def some_data( - created_at=dlt.sources.incremental("data.items[0].created_at", on_cursor_value_none="raise") + created_at=dlt.sources.incremental("data.items[0].created_at", on_cursor_value_missing="raise") ): yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} @@ -806,7 +806,7 @@ def test_cursor_path_none_nested_can_raise_on_none_2() -> None: # No pandas and arrow. See test_nested_cursor_path_arrow_fails @dlt.resource def some_data( - created_at=dlt.sources.incremental("data.items[*].created_at", on_cursor_value_none="raise") + created_at=dlt.sources.incremental("data.items[*].created_at", on_cursor_value_missing="raise") ): yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} @@ -820,7 +820,7 @@ def test_cursor_path_none_nested_can_include_on_none_1() -> None: @dlt.resource def some_data( created_at=dlt.sources.incremental( - "data.items[*].created_at", on_cursor_value_none="include" + "data.items[*].created_at", on_cursor_value_missing="include" ) ): yield { @@ -849,7 +849,7 @@ def test_cursor_path_none_nested_can_include_on_none_2() -> None: @dlt.resource def some_data( created_at=dlt.sources.incremental( - "data.items[0].created_at", on_cursor_value_none="include" + "data.items[0].created_at", on_cursor_value_missing="include" ) ): yield { @@ -878,7 +878,7 @@ def test_cursor_path_none_nested_includes_rows_without_cursor_path() -> None: @dlt.resource def some_data( created_at=dlt.sources.incremental( - "data.items[*].created_at", on_cursor_value_none="include" + "data.items[*].created_at", on_cursor_value_missing="include" ) ): yield { From 2eb2928479fbc5c0c7726d08ebcd2e276d3cc120 Mon Sep 17 00:00:00 2001 From: Willi Date: Fri, 2 Aug 2024 17:06:12 +0530 Subject: [PATCH 17/32] makes supplied cursor_path pop out in error messages --- dlt/extract/incremental/exceptions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dlt/extract/incremental/exceptions.py b/dlt/extract/incremental/exceptions.py index 1eebccb37c..539a211280 100644 --- a/dlt/extract/incremental/exceptions.py +++ b/dlt/extract/incremental/exceptions.py @@ -8,7 +8,7 @@ def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = N self.item = item msg = ( msg - or f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document because they can be different from the names you see in database." + or f"Cursor element with JSON path `{json_path}` was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document because they can be different from the names you see in database." ) super().__init__(pipe_name, msg) @@ -19,7 +19,7 @@ def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = N self.item = item msg = ( msg - or f"Cursor element with JSON path {json_path} has the value `None` in extracted data item. All data items must contain a value != None. Construct the incremental with on_cursor_value_none='include' if you want to include such rows" + or f"Cursor element with JSON path `{json_path}` has the value `None` in extracted data item. All data items must contain a value != None. Construct the incremental with on_cursor_value_none='include' if you want to include such rows" ) super().__init__(pipe_name, msg) From c3e63e856a9f260f48a526d2fc2e6f1097b0d9af Mon Sep 17 00:00:00 2001 From: Willi Date: Fri, 2 Aug 2024 17:31:41 +0530 Subject: [PATCH 18/32] code formatting & docs update --- dlt/extract/incremental/__init__.py | 4 +- dlt/extract/incremental/transform.py | 13 ++++-- .../docs/general-usage/incremental-loading.md | 44 ++++++++++++++----- tests/extract/test_incremental.py | 35 +++++++++++---- 4 files changed, 73 insertions(+), 23 deletions(-) diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 6003f5e5db..343a737c07 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -142,7 +142,9 @@ def __init__( self.row_order = row_order self.allow_external_schedulers = allow_external_schedulers if on_cursor_value_missing not in ["raise", "include", "exclude"]: - raise ValueError(f"Unexpected argument for on_cursor_value_none. Got {on_cursor_value_missing}") + raise ValueError( + f"Unexpected argument for on_cursor_value_missing. Got {on_cursor_value_missing}" + ) self.on_cursor_value_missing = on_cursor_value_missing self._cached_state: IncrementalColumnState = None diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index b145bfc04e..757d9e96a2 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -129,7 +129,9 @@ def find_cursor_value(self, row: TDataItem) -> Any: return None elif None in cursor_values: if self.on_cursor_value_missing == "raise": - raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) + raise IncrementalCursorPathHasValueNone( + self.resource_name, self.cursor_path, row + ) elif self.on_cursor_value_missing == "exclude": return None @@ -140,15 +142,18 @@ def find_cursor_value(self, row: TDataItem) -> Any: if row_value is None: if self.on_cursor_value_missing == "raise": if self.cursor_path not in row.keys(): - raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) + raise IncrementalCursorPathMissing( + self.resource_name, self.cursor_path, row + ) else: - raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) + raise IncrementalCursorPathHasValueNone( + self.resource_name, self.cursor_path, row + ) elif self.on_cursor_value_missing == "exclude": return None return row_value - def __call__( self, row: TDataItem, diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 431cc3532d..f7b6694c22 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -878,34 +878,58 @@ Consider the example below for reading incremental loading parameters from "conf ``` `id_after` incrementally stores the latest `cursor_path` value for future pipeline runs. -### Loading NULL values in the incremental cursor field +### Loading when incremental cursor path is missing or value is None/NULL + +You can customize the incremental processing of dlt by setting the parameter `on_cursor_value_missing`. + +When loading incrementally there are two assumptions: +1. each row contains the cursor path +2. each row is expected to contain a value at the cursor path that is not `None`. -When loading incrementally with a cursor field, each row is expected to contain a value at the cursor field that is not `None`. For example, the following source data will raise an error: ```py @dlt.resource -def some_data(updated_at=dlt.sources.incremental("updated_at")): +def some_data_without_cursor_path(updated_at=dlt.sources.incremental("updated_at")): yield [ {"id": 1, "created_at": 1, "updated_at": 1}, - {"id": 2, "created_at": 2, "updated_at": 2}, - {"id": 3, "created_at": 4, "updated_at": None}, + {"id": 2, "created_at": 2}, # cursor field is missing ] -list(some_data()) +list(some_data_without_cursor_path()) + +@dlt.resource +def some_data_without_cursor_value(updated_at=dlt.sources.incremental("updated_at")): + yield [ + {"id": 1, "created_at": 1, "updated_at": 1}, + {"id": 3, "created_at": 4, "updated_at": None}, # value at cursor field is None + ] + +list(some_data_without_cursor_value()) ``` -If you want to load data that includes `None` values there are two options: +If you want to load data that includes rows without the cursor path or `None` values there are two options: -1. Transform the values at the incremental cursor to a value different from `None` before the incremental object is called. [See docs below](#transform-records-before-incremental-processing) -2. Configure the incremental load to tolerate `None` values using `incremental(..., on_cursor_value_none="include")`. +1. Before the incremental processing begins: Ensure that the incremental field is present and transform the values at the incremental cursor to a value different from `None` . [See docs below](#transform-records-before-incremental-processing) +2. Configure the incremental load to tolerate the missing cursor path and `None` values using `incremental(..., on_cursor_value_missing="include")`. Example: ```py @dlt.resource +def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_missing="include")): + yield [ + {"id": 1, "created_at": 1, "updated_at": 1}, + {"id": 2, "created_at": 2}, + {"id": 3, "created_at": 4, "updated_at": None}, + ] -list(some_data()) +result = list(some_data()) +assert len(result) == 3 +assert result[1] == {"id": 2, "created_at": 2} +assert result[2] == {"id": 3, "created_at": 4, "updated_at": None} ``` +Similarly, when the cursor path + ### Transform records before incremental processing If you want to load data that includes `None` values you can transform the records before the incremental processing. You can add steps to the pipeline that [filter, transform, or pivot your data](../general-usage/resource.md#filter-transform-and-pivot-data). diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 76e8f8e0d2..52d5baabf0 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -652,7 +652,9 @@ def test_cursor_path_none_includes_records_and_updates_incremental_cursor_1( source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")): + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include") + ): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) @@ -678,7 +680,9 @@ def test_cursor_path_none_includes_records_and_updates_incremental_cursor_2( source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")): + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include") + ): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) @@ -704,7 +708,9 @@ def test_cursor_path_none_includes_records_and_updates_incremental_cursor_3( source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")): + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include") + ): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) @@ -716,6 +722,7 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_m ] assert s["last_value"] == 2 + @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) def test_cursor_path_none_includes_records_without_cursor_path( item_type: TestDataItemFormat, @@ -727,7 +734,9 @@ def test_cursor_path_none_includes_records_without_cursor_path( source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include")): + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include") + ): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) @@ -739,6 +748,7 @@ def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_m ] assert s["last_value"] == 1 + @pytest.mark.parametrize("item_type", ["object"]) def test_cursor_path_none_excludes_records_and_updates_incremental_cursor( item_type: TestDataItemFormat, @@ -751,7 +761,9 @@ def test_cursor_path_none_excludes_records_and_updates_incremental_cursor( source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="exclude")): + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="exclude") + ): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) @@ -773,7 +785,9 @@ def test_cursor_path_none_can_raise_on_none() -> None: ] @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise")): + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise") + ): yield source_items with pytest.raises(IncrementalCursorPathHasValueNone) as py_ex: @@ -793,7 +807,9 @@ def test_cursor_path_none_nested_can_raise_on_none_1() -> None: # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails @dlt.resource def some_data( - created_at=dlt.sources.incremental("data.items[0].created_at", on_cursor_value_missing="raise") + created_at=dlt.sources.incremental( + "data.items[0].created_at", on_cursor_value_missing="raise" + ) ): yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} @@ -806,7 +822,9 @@ def test_cursor_path_none_nested_can_raise_on_none_2() -> None: # No pandas and arrow. See test_nested_cursor_path_arrow_fails @dlt.resource def some_data( - created_at=dlt.sources.incremental("data.items[*].created_at", on_cursor_value_missing="raise") + created_at=dlt.sources.incremental( + "data.items[*].created_at", on_cursor_value_missing="raise" + ) ): yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} @@ -901,6 +919,7 @@ def some_data( assert_query_data(p, "select count(*) from some_data__data__items", [2]) + @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) def test_set_default_value_for_incremental_cursor(item_type: TestDataItemFormat) -> None: @dlt.resource From ab5e334e5e8a32626625e4f755034c3d72ce07e9 Mon Sep 17 00:00:00 2001 From: Willi Date: Mon, 5 Aug 2024 17:52:21 +0530 Subject: [PATCH 19/32] integrate docs for the four ways of incremental processing of missing cursor value --- .../docs/general-usage/incremental-loading.md | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index f7b6694c22..9db5f2658c 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -882,11 +882,11 @@ Consider the example below for reading incremental loading parameters from "conf You can customize the incremental processing of dlt by setting the parameter `on_cursor_value_missing`. -When loading incrementally there are two assumptions: +When loading incrementally with the default settings, there are two assumptions: 1. each row contains the cursor path 2. each row is expected to contain a value at the cursor path that is not `None`. -For example, the following source data will raise an error: +For example, the two following source data will raise an error: ```py @dlt.resource def some_data_without_cursor_path(updated_at=dlt.sources.incremental("updated_at")): @@ -907,12 +907,15 @@ def some_data_without_cursor_value(updated_at=dlt.sources.incremental("updated_a list(some_data_without_cursor_value()) ``` -If you want to load data that includes rows without the cursor path or `None` values there are two options: -1. Before the incremental processing begins: Ensure that the incremental field is present and transform the values at the incremental cursor to a value different from `None` . [See docs below](#transform-records-before-incremental-processing) +To process a data set where some records do not include the incremental cursor path or where the values at the cursor path are `None,` there are the following four options: + +1. Configure the incremental load to raise an exception in case there is a row where the cursor path is missing or has the value `None` using `incremental(..., on_cursor_value_missing="raise")`. This is the default behavior. 2. Configure the incremental load to tolerate the missing cursor path and `None` values using `incremental(..., on_cursor_value_missing="include")`. +3. Configure the incremental load to exclude the missing cursor path and `None` values using `incremental(..., on_cursor_value_missing="exclude")`. +4. Before the incremental processing begins: Ensure that the incremental field is present and transform the values at the incremental cursor to a value different from `None`. [See docs below](#transform-records-before-incremental-processing) -Example: +Here is an example of including rows where the incremental cursor value is missing or `None`: ```py @dlt.resource def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_missing="include")): @@ -928,7 +931,20 @@ assert result[1] == {"id": 2, "created_at": 2} assert result[2] == {"id": 3, "created_at": 4, "updated_at": None} ``` -Similarly, when the cursor path +If you do not want to import records without the cursor path or where the value at the cursor path is `None` use the following incremental configuration: + +```py +@dlt.resource +def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_missing="exclude")): + yield [ + {"id": 1, "created_at": 1, "updated_at": 1}, + {"id": 2, "created_at": 2}, + {"id": 3, "created_at": 4, "updated_at": None}, + ] + +result = list(some_data()) +assert len(result) == 1 +``` ### Transform records before incremental processing If you want to load data that includes `None` values you can transform the records before the incremental processing. From 2820756c3ca3024f37a60c1e09b064a4c15facb5 Mon Sep 17 00:00:00 2001 From: Willi Date: Mon, 5 Aug 2024 19:07:21 +0530 Subject: [PATCH 20/32] arrow: supports exclusion of rows where cursor value is missing --- dlt/extract/incremental/transform.py | 10 +++++++++- tests/extract/test_incremental.py | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 757d9e96a2..c0989fdd43 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -305,6 +305,9 @@ def __call__( # TODO: Json path support. For now assume the cursor_path is a column name cursor_path = self.cursor_path + if self.on_cursor_value_missing == "exclude": + tbl = self._remove_null_at_cursor_path(tbl, cursor_path) + # The new max/min value try: # NOTE: datetimes are always pendulum in UTC @@ -316,7 +319,7 @@ def __call__( self.resource_name, cursor_path, tbl, - f"Column name {cursor_path} was not found in the arrow table. Not nested JSON paths" + f"Column name `{cursor_path}` was not found in the arrow table. Nested JSON paths" " are not supported for arrow tables and dataframes, the incremental cursor_path" " must be a column name.", ) from e @@ -389,3 +392,8 @@ def __call__( if is_pandas: return tbl.to_pandas(), start_out_of_range, end_out_of_range return tbl, start_out_of_range, end_out_of_range + + def _remove_null_at_cursor_path(self, tbl, cursor_path): + mask = pa.compute.is_valid(tbl[cursor_path]) + filtered = tbl.filter(mask) + return filtered diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 52d5baabf0..e71bd1c2d0 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -749,7 +749,7 @@ def some_data( assert s["last_value"] == 1 -@pytest.mark.parametrize("item_type", ["object"]) +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) def test_cursor_path_none_excludes_records_and_updates_incremental_cursor( item_type: TestDataItemFormat, ) -> None: From d127e46e749df13a9ab3ddb8f0d319a239666d4b Mon Sep 17 00:00:00 2001 From: Willi Date: Mon, 5 Aug 2024 19:08:13 +0530 Subject: [PATCH 21/32] make test suite tighter --- tests/extract/test_incremental.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index e71bd1c2d0..93aef790fb 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -661,6 +661,7 @@ def some_data( p.run(some_data(), destination="duckdb") assert_query_data(p, "select count(id) from some_data", [3]) + assert_query_data(p, "select count(created_at) from some_data", [2]) s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ "created_at" @@ -689,6 +690,7 @@ def some_data( p.run(some_data(), destination="duckdb") assert_query_data(p, "select count(id) from some_data", [3]) + assert_query_data(p, "select count(created_at) from some_data", [2]) s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ "created_at" @@ -716,6 +718,7 @@ def some_data( p = dlt.pipeline(pipeline_name=uniq_id()) p.run(some_data(), destination="duckdb") assert_query_data(p, "select count(id) from some_data", [3]) + assert_query_data(p, "select count(created_at) from some_data", [2]) s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ "created_at" @@ -742,6 +745,7 @@ def some_data( p = dlt.pipeline(pipeline_name=uniq_id()) p.run(some_data(), destination="duckdb") assert_query_data(p, "select count(id) from some_data", [2]) + assert_query_data(p, "select count(created_at) from some_data", [1]) s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ "created_at" From 6d7d4c0937682dd98caaeb18771e95fae52b598a Mon Sep 17 00:00:00 2001 From: Willi Date: Wed, 7 Aug 2024 16:38:44 +0530 Subject: [PATCH 22/32] arrow-table: processes null rows and adds them back after processing. Fails for arrow-batch --- dlt/extract/incremental/exceptions.py | 10 +++- dlt/extract/incremental/transform.py | 21 +++++--- tests/extract/test_incremental.py | 71 +++++++++++++++++++++++++-- 3 files changed, 91 insertions(+), 11 deletions(-) diff --git a/dlt/extract/incremental/exceptions.py b/dlt/extract/incremental/exceptions.py index 539a211280..e056098ba6 100644 --- a/dlt/extract/incremental/exceptions.py +++ b/dlt/extract/incremental/exceptions.py @@ -2,8 +2,12 @@ from dlt.common.typing import TDataItem + + class IncrementalCursorPathMissing(PipeException): - def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None: + def __init__( + self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None + ) -> None: self.json_path = json_path self.item = item msg = ( @@ -14,7 +18,9 @@ def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = N class IncrementalCursorPathHasValueNone(PipeException): - def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None: + def __init__( + self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None + ) -> None: self.json_path = json_path self.item = item msg = ( diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index c0989fdd43..a92b0c1a0e 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -305,8 +305,6 @@ def __call__( # TODO: Json path support. For now assume the cursor_path is a column name cursor_path = self.cursor_path - if self.on_cursor_value_missing == "exclude": - tbl = self._remove_null_at_cursor_path(tbl, cursor_path) # The new max/min value try: @@ -324,6 +322,11 @@ def __call__( " must be a column name.", ) from e + if tbl.schema.field(cursor_path).nullable: + tbl_without_null, tbl_with_null = self._process_null_at_cursor_path(tbl) + + tbl = tbl_without_null + # If end_value is provided, filter to include table rows that are "less" than end_value if self.end_value is not None: end_value_scalar = to_arrow_scalar(self.end_value, cursor_data_type) @@ -382,6 +385,8 @@ def __call__( ) ) ) + if self.on_cursor_value_missing == "include": + tbl = pa.concat_tables([tbl, tbl_with_null]) if len(tbl) == 0: return None, start_out_of_range, end_out_of_range @@ -393,7 +398,11 @@ def __call__( return tbl.to_pandas(), start_out_of_range, end_out_of_range return tbl, start_out_of_range, end_out_of_range - def _remove_null_at_cursor_path(self, tbl, cursor_path): - mask = pa.compute.is_valid(tbl[cursor_path]) - filtered = tbl.filter(mask) - return filtered + def _process_null_at_cursor_path(self, tbl: pa.Table) -> Tuple["pa.Table", "pa.Table"]: + mask = pa.compute.is_valid(tbl[self.cursor_path]) + rows_without_null = tbl.filter(mask) + rows_with_null = tbl.filter(pa.compute.invert(mask)) + if self.on_cursor_value_missing == "raise": + if rows_with_null.num_rows > 0: + raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path) + return rows_without_null, rows_with_null diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 93aef790fb..d579d9b9ee 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -780,13 +780,14 @@ def some_data( assert s["last_value"] == 2 -def test_cursor_path_none_can_raise_on_none() -> None: - # No None support for pandas and arrow yet - source_items = [ +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_can_raise_on_none_1(item_type: TestDataItemFormat) -> None: + data = [ {"id": 1, "created_at": 1}, {"id": 2, "created_at": None}, {"id": 3, "created_at": 2}, ] + source_items = data_to_item_format(item_type, data) @dlt.resource def some_data( @@ -807,6 +808,70 @@ def some_data( assert pip_ex.value.__context__.json_path == "created_at" +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_can_raise_on_none_2(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2}, + {"id": 3, "created_at": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise") + ): + yield source_items + + # there is no fixed, error because cursor path is missing + if item_type == "object": + with pytest.raises(IncrementalCursorPathMissing) as ex: + list(some_data()) + assert ex.value.json_path == "created_at" + # there is a fixed schema, error because value is null + else: + with pytest.raises(IncrementalCursorPathHasValueNone) as e: + list(some_data()) + assert e.value.json_path == "created_at" + + # same thing when run in pipeline + with pytest.raises(PipelineStepFailed) as e: + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) + if item_type == "object": + assert isinstance(e.value.__context__, IncrementalCursorPathMissing) + else: + assert isinstance(e.value.__context__, IncrementalCursorPathHasValueNone) + assert e.value.__context__.json_path == "created_at" + + +@pytest.mark.parametrize("item_type", ["arrow-table", "arrow-batch", "pandas"]) +def test_cursor_path_none_can_raise_on_column_missing(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1}, + {"id": 2}, + {"id": 3}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise") + ): + yield source_items + + with pytest.raises(IncrementalCursorPathMissing) as py_ex: + list(some_data()) + assert py_ex.value.json_path == "created_at" + + # same thing when run in pipeline + with pytest.raises(PipelineStepFailed) as pip_ex: + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) + assert pip_ex.value.__context__.json_path == "created_at" + assert isinstance(pip_ex.value.__context__, IncrementalCursorPathMissing) + + def test_cursor_path_none_nested_can_raise_on_none_1() -> None: # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails @dlt.resource From d4b8b6881287ba13fc83f4b8ecda3ab76af4d78d Mon Sep 17 00:00:00 2001 From: Willi Date: Wed, 7 Aug 2024 16:56:57 +0530 Subject: [PATCH 23/32] supports include null cursor_path rows for RecordBatches --- dlt/extract/incremental/exceptions.py | 2 -- dlt/extract/incremental/transform.py | 6 +++++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dlt/extract/incremental/exceptions.py b/dlt/extract/incremental/exceptions.py index e056098ba6..f299f601d5 100644 --- a/dlt/extract/incremental/exceptions.py +++ b/dlt/extract/incremental/exceptions.py @@ -2,8 +2,6 @@ from dlt.common.typing import TDataItem - - class IncrementalCursorPathMissing(PipeException): def __init__( self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index a92b0c1a0e..d6e123f3f1 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -386,7 +386,11 @@ def __call__( ) ) if self.on_cursor_value_missing == "include": - tbl = pa.concat_tables([tbl, tbl_with_null]) + if isinstance(tbl, pa.RecordBatch): + assert isinstance(tbl_with_null, pa.RecordBatch) + tbl = pa.Table.from_batches([tbl, tbl_with_null]) + else: + tbl = pa.concat_tables([tbl, tbl_with_null]) if len(tbl) == 0: return None, start_out_of_range, end_out_of_range From ca7b91c2e859c825ee9b8c5272ee3cfdaacc1836 Mon Sep 17 00:00:00 2001 From: Willi Date: Wed, 7 Aug 2024 19:35:35 +0530 Subject: [PATCH 24/32] ignores typing errors --- tests/extract/test_incremental.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index d579d9b9ee..17fb6f54cb 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -835,14 +835,14 @@ def some_data( assert e.value.json_path == "created_at" # same thing when run in pipeline - with pytest.raises(PipelineStepFailed) as e: + with pytest.raises(PipelineStepFailed) as e: # type: ignore[assignment] p = dlt.pipeline(pipeline_name=uniq_id()) p.extract(some_data()) if item_type == "object": assert isinstance(e.value.__context__, IncrementalCursorPathMissing) else: assert isinstance(e.value.__context__, IncrementalCursorPathHasValueNone) - assert e.value.__context__.json_path == "created_at" + assert e.value.__context__.json_path == "created_at" # type: ignore[attr-defined] @pytest.mark.parametrize("item_type", ["arrow-table", "arrow-batch", "pandas"]) @@ -868,7 +868,7 @@ def some_data( with pytest.raises(PipelineStepFailed) as pip_ex: p = dlt.pipeline(pipeline_name=uniq_id()) p.extract(some_data()) - assert pip_ex.value.__context__.json_path == "created_at" + assert pip_ex.value.__context__.json_path == "created_at" # type: ignore[attr-defined] assert isinstance(pip_ex.value.__context__, IncrementalCursorPathMissing) From d6fde197834f6120c88c59025df0205562052ee1 Mon Sep 17 00:00:00 2001 From: Willi Date: Thu, 8 Aug 2024 15:43:29 +0530 Subject: [PATCH 25/32] fixes import error when arrow is not installed --- dlt/extract/incremental/transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index d6e123f3f1..d218292fe0 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -402,7 +402,7 @@ def __call__( return tbl.to_pandas(), start_out_of_range, end_out_of_range return tbl, start_out_of_range, end_out_of_range - def _process_null_at_cursor_path(self, tbl: pa.Table) -> Tuple["pa.Table", "pa.Table"]: + def _process_null_at_cursor_path(self, tbl: "pa.Table") -> Tuple["pa.Table", "pa.Table"]: mask = pa.compute.is_valid(tbl[self.cursor_path]) rows_without_null = tbl.filter(mask) rows_with_null = tbl.filter(pa.compute.invert(mask)) From 99c1e3983510b955404ab83e18ffa45e08739b7d Mon Sep 17 00:00:00 2001 From: Willi Date: Thu, 8 Aug 2024 22:37:52 +0530 Subject: [PATCH 26/32] code formatting --- tests/extract/test_incremental.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 17fb6f54cb..a64e8bcdaf 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -835,7 +835,7 @@ def some_data( assert e.value.json_path == "created_at" # same thing when run in pipeline - with pytest.raises(PipelineStepFailed) as e: # type: ignore[assignment] + with pytest.raises(PipelineStepFailed) as e: # type: ignore[assignment] p = dlt.pipeline(pipeline_name=uniq_id()) p.extract(some_data()) if item_type == "object": From 06aff12969c5284218f5c73c67c424908f6050d1 Mon Sep 17 00:00:00 2001 From: Willi Date: Thu, 8 Aug 2024 22:37:57 +0530 Subject: [PATCH 27/32] dissatisfying headstand to pass tests/pipeline/test_pipeline_extra.py::test_dump_trace_freeze_exception --- dlt/extract/incremental/transform.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index d218292fe0..79a706269a 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -138,10 +138,21 @@ def find_cursor_value(self, row: TDataItem) -> Any: # ignores the other found values, e.g. when the path is $data.items[*].created_at row_value = cursor_values[0] else: - row_value = row.get(self.cursor_path) + row_value = None + try: + row_value = row.get(self.cursor_path) + except AttributeError: + # just to pass tests/pipeline/test_pipeline_extra.py::test_dump_trace_freeze_exception + # Wouldn't it be preferrable to tell the exact exception? + pass if row_value is None: if self.on_cursor_value_missing == "raise": - if self.cursor_path not in row.keys(): + has_field = None + if isinstance(row, dict): + has_field = self.cursor_path in row.keys() + else: + has_field = hasattr(row, self.cursor_path) + if not has_field: raise IncrementalCursorPathMissing( self.resource_name, self.cursor_path, row ) From a9d1775eb313c5e25ecd20e09ce0cb8c051a3f1e Mon Sep 17 00:00:00 2001 From: Willi Date: Tue, 20 Aug 2024 16:11:21 +0530 Subject: [PATCH 28/32] extracts low-level code to method --- dlt/extract/incremental/transform.py | 38 ++++++++++++++++++---------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 79a706269a..8f2ff4309f 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -118,7 +118,8 @@ class JsonIncremental(IncrementalTransform): def find_cursor_value(self, row: TDataItem) -> Any: """Finds value in row at cursor defined by self.cursor_path. - Will use compiled JSONPath if present, otherwise it reverts to column search if row is dict + Will use compiled JSONPath if present. + Otherwise, reverts to field access if row is dict, Pydantic model, or of other class. """ if self._compiled_cursor_path: cursor_values = find_values(self._compiled_cursor_path, row) @@ -138,20 +139,10 @@ def find_cursor_value(self, row: TDataItem) -> Any: # ignores the other found values, e.g. when the path is $data.items[*].created_at row_value = cursor_values[0] else: - row_value = None - try: - row_value = row.get(self.cursor_path) - except AttributeError: - # just to pass tests/pipeline/test_pipeline_extra.py::test_dump_trace_freeze_exception - # Wouldn't it be preferrable to tell the exact exception? - pass + row_value = self._value_at_cursor_path(row) if row_value is None: if self.on_cursor_value_missing == "raise": - has_field = None - if isinstance(row, dict): - has_field = self.cursor_path in row.keys() - else: - has_field = hasattr(row, self.cursor_path) + has_field = self._contains_cursor_path(row) if not has_field: raise IncrementalCursorPathMissing( self.resource_name, self.cursor_path, row @@ -165,6 +156,27 @@ def find_cursor_value(self, row: TDataItem) -> Any: return row_value + + def _value_at_cursor_path(self, row: TDataItem): + row_value = None + try: + row_value = row.get(self.cursor_path) + except AttributeError: + # supports Pydantic models and other classes + row_value = getattr(row, self.cursor_path) + return row_value + + + def _contains_cursor_path(self, row: TDataItem) -> bool: + has_field = None + try: + has_field = self.cursor_path in row.keys() + except AttributeError: + # supports Pydantic models and other classes + has_field = hasattr(row, self.cursor_path) + return has_field + + def __call__( self, row: TDataItem, From 605461c333e8ad45dde39430cf20aba0071bf94c Mon Sep 17 00:00:00 2001 From: Willi Date: Tue, 20 Aug 2024 16:12:18 +0530 Subject: [PATCH 29/32] simplifies code a bit --- dlt/extract/incremental/transform.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 8f2ff4309f..6a1514ac5b 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -156,26 +156,19 @@ def find_cursor_value(self, row: TDataItem) -> Any: return row_value - - def _value_at_cursor_path(self, row: TDataItem): - row_value = None + def _value_at_cursor_path(self, row: TDataItem) -> Optional[TDataItem]: try: - row_value = row.get(self.cursor_path) + return row.get(self.cursor_path) except AttributeError: # supports Pydantic models and other classes - row_value = getattr(row, self.cursor_path) - return row_value - + return getattr(row, self.cursor_path) def _contains_cursor_path(self, row: TDataItem) -> bool: - has_field = None try: - has_field = self.cursor_path in row.keys() + return self.cursor_path in row.keys() except AttributeError: # supports Pydantic models and other classes - has_field = hasattr(row, self.cursor_path) - return has_field - + return hasattr(row, self.cursor_path) def __call__( self, From 0b267a292b24f9d5ced28f88d189ae60529af6b7 Mon Sep 17 00:00:00 2001 From: Willi Date: Tue, 20 Aug 2024 18:05:40 +0530 Subject: [PATCH 30/32] adds failing test with overlapping incremental cursor values. --- tests/extract/test_incremental.py | 41 +++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index a64e8bcdaf..cb95b39f2b 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -669,6 +669,47 @@ def some_data( assert s["last_value"] == 2 +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_does_not_include_overlapping_records( + item_type: TestDataItemFormat, +) -> None: + @dlt.resource + def some_data( + invocation: int, + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include"), + ): + if invocation == 1: + yield data_to_item_format( + item_type, + [ + {"id": 1, "created_at": None}, + {"id": 2, "created_at": 1}, + {"id": 3, "created_at": 2}, + ], + ) + elif invocation == 2: + yield data_to_item_format( + item_type, + [ + {"id": 4, "created_at": 1}, + {"id": 5, "created_at": None}, + {"id": 6, "created_at": 3}, + ], + ) + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(1), destination="duckdb") + p.run(some_data(2), destination="duckdb") + + assert_query_data(p, "select id from some_data order by id", [1, 2, 3, 5, 6]) + assert_query_data(p, "select created_at from some_data order by created_at", [1, 2, 3, None, None]) + + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 3 + + @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) def test_cursor_path_none_includes_records_and_updates_incremental_cursor_2( item_type: TestDataItemFormat, From 9786abd9cee325daa56a96d5815a0ca1802eae2f Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 21 Aug 2024 13:50:18 +0200 Subject: [PATCH 31/32] drops temp arrow index before concat and any returns --- dlt/extract/incremental/transform.py | 11 ++++++----- tests/extract/test_incremental.py | 4 +++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 6a1514ac5b..3d11fb8a3c 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -401,6 +401,11 @@ def __call__( ) ) ) + + # drop the temp unique index before concat and returning + if "_dlt_index" in tbl.schema.names: + tbl = pyarrow.remove_columns(tbl, ["_dlt_index"]) + if self.on_cursor_value_missing == "include": if isinstance(tbl, pa.RecordBatch): assert isinstance(tbl_with_null, pa.RecordBatch) @@ -410,12 +415,8 @@ def __call__( if len(tbl) == 0: return None, start_out_of_range, end_out_of_range - try: - tbl = pyarrow.remove_columns(tbl, ["_dlt_index"]) - except KeyError: - pass if is_pandas: - return tbl.to_pandas(), start_out_of_range, end_out_of_range + tbl = tbl.to_pandas() return tbl, start_out_of_range, end_out_of_range def _process_null_at_cursor_path(self, tbl: "pa.Table") -> Tuple["pa.Table", "pa.Table"]: diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index cb95b39f2b..bdad8cd5e1 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -702,7 +702,9 @@ def some_data( p.run(some_data(2), destination="duckdb") assert_query_data(p, "select id from some_data order by id", [1, 2, 3, 5, 6]) - assert_query_data(p, "select created_at from some_data order by created_at", [1, 2, 3, None, None]) + assert_query_data( + p, "select created_at from some_data order by created_at", [1, 2, 3, None, None] + ) s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ "created_at" From 4ce384a610e6e748b72567f86343e34480ac4dee Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Fri, 30 Aug 2024 14:06:45 +0200 Subject: [PATCH 32/32] handles non existing attr when gettin row_value, simplifies getter code --- dlt/extract/incremental/transform.py | 72 ++++++++++----------------- tests/pipeline/test_pipeline_extra.py | 1 - 2 files changed, 27 insertions(+), 46 deletions(-) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 70e6b80ba2..eb448d4266 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -1,5 +1,5 @@ from datetime import datetime # noqa: I251 -from typing import Any, Optional, Set, Tuple, List +from typing import Any, Optional, Set, Tuple, List, Type from dlt.common.exceptions import MissingDependencyException from dlt.common.utils import digest128 @@ -122,54 +122,36 @@ def find_cursor_value(self, row: TDataItem) -> Any: Will use compiled JSONPath if present. Otherwise, reverts to field access if row is dict, Pydantic model, or of other class. """ + key_exc: Type[Exception] = IncrementalCursorPathHasValueNone if self._compiled_cursor_path: - cursor_values = find_values(self._compiled_cursor_path, row) - if cursor_values == []: - if self.on_cursor_value_missing == "raise": - raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) - elif self.on_cursor_value_missing == "exclude": - return None - elif None in cursor_values: - if self.on_cursor_value_missing == "raise": - raise IncrementalCursorPathHasValueNone( - self.resource_name, self.cursor_path, row - ) - elif self.on_cursor_value_missing == "exclude": - return None - # ignores the other found values, e.g. when the path is $data.items[*].created_at - row_value = cursor_values[0] + try: + row_value = find_values(self._compiled_cursor_path, row)[0] + except IndexError: + # empty list so raise a proper exception + row_value = None + key_exc = IncrementalCursorPathMissing else: - row_value = self._value_at_cursor_path(row) - if row_value is None: - if self.on_cursor_value_missing == "raise": - has_field = self._contains_cursor_path(row) - if not has_field: - raise IncrementalCursorPathMissing( - self.resource_name, self.cursor_path, row - ) - else: - raise IncrementalCursorPathHasValueNone( - self.resource_name, self.cursor_path, row - ) - elif self.on_cursor_value_missing == "exclude": - return None - - return row_value - - def _value_at_cursor_path(self, row: TDataItem) -> Optional[TDataItem]: - try: - return row.get(self.cursor_path) - except AttributeError: - # supports Pydantic models and other classes - return getattr(row, self.cursor_path) + try: + try: + row_value = row[self.cursor_path] + except TypeError: + # supports Pydantic models and other classes + row_value = getattr(row, self.cursor_path) + except (KeyError, AttributeError): + # attr not found so raise a proper exception + row_value = None + key_exc = IncrementalCursorPathMissing + + # if we have a value - return it + if row_value is not None: + return row_value - def _contains_cursor_path(self, row: TDataItem) -> bool: - try: - return self.cursor_path in row.keys() - except AttributeError: - # supports Pydantic models and other classes - return hasattr(row, self.cursor_path) + if self.on_cursor_value_missing == "raise": + # raise missing path or None value exception + raise key_exc(self.resource_name, self.cursor_path, row) + elif self.on_cursor_value_missing == "exclude": + return None def __call__( self, diff --git a/tests/pipeline/test_pipeline_extra.py b/tests/pipeline/test_pipeline_extra.py index c757959bec..af3a6c239e 100644 --- a/tests/pipeline/test_pipeline_extra.py +++ b/tests/pipeline/test_pipeline_extra.py @@ -246,7 +246,6 @@ class TestRow(BaseModel): example_string: str # yield model in resource so incremental fails when looking for "id" - # TODO: support pydantic models in incremental @dlt.resource(name="table_name", primary_key="id", write_disposition="replace") def generate_rows_incremental(