From b84c6e3e379652e2050835a0ab6844b7389d1bc7 Mon Sep 17 00:00:00 2001 From: Willi Date: Thu, 11 Jul 2024 19:38:52 +0530 Subject: [PATCH] implements ability to raise on cursor_path = None or to include the row. Fails for pandas & pyarrow implements handling of cursor_path value is None for nested JSON paths --- dlt/extract/incremental/__init__.py | 14 +- dlt/extract/incremental/exceptions.py | 13 +- dlt/extract/incremental/transform.py | 26 ++- dlt/extract/incremental/typing.py | 3 +- .../docs/general-usage/incremental-loading.md | 6 +- tests/extract/test_incremental.py | 180 +++++++++++++----- 6 files changed, 186 insertions(+), 56 deletions(-) diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 11f989e0b2..ee44929fd8 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -35,7 +35,12 @@ IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, ) -from dlt.extract.incremental.typing import IncrementalColumnState, TCursorValue, LastValueFunc +from dlt.extract.incremental.typing import ( + IncrementalColumnState, + TCursorValue, + LastValueFunc, + OnCursorValueNone, +) from dlt.extract.pipe import Pipe from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform from dlt.extract.incremental.transform import ( @@ -81,7 +86,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa >>> info = p.run(r, destination="duckdb") Args: - cursor_path: The name or a JSON path to an cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database. + cursor_path: The name or a JSON path to a cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database. initial_value: Optional value used for `last_value` when no state is available, e.g. on the first run of the pipeline. If not provided `last_value` will be `None` on the first run. last_value_func: Callable used to determine which cursor value to save in state. It is called with a list of the stored state value and all cursor vals from currently processing items. Default is `max` primary_key: Optional primary key used to deduplicate data. If not provided, a primary key defined by the resource will be used. Pass a tuple to define a compound key. Pass empty tuple to disable unique checks @@ -95,6 +100,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class. The values passed explicitly to Incremental will be ignored. Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded + on_cursor_value_none: Specify what happens when a record has `None` at the cursor_path: raise, include """ # this is config/dataclass so declare members @@ -104,6 +110,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa end_value: Optional[Any] = None row_order: Optional[TSortOrder] = None allow_external_schedulers: bool = False + on_cursor_value_none: OnCursorValueNone = "raise" # incremental acting as empty EMPTY: ClassVar["Incremental[Any]"] = None @@ -117,6 +124,7 @@ def __init__( end_value: Optional[TCursorValue] = None, row_order: Optional[TSortOrder] = None, allow_external_schedulers: bool = False, + on_cursor_value_none: OnCursorValueNone = "raise", ) -> None: # make sure that path is valid if cursor_path: @@ -132,6 +140,7 @@ def __init__( self._primary_key: Optional[TTableHintTemplate[TColumnNames]] = primary_key self.row_order = row_order self.allow_external_schedulers = allow_external_schedulers + self.on_cursor_value_none = on_cursor_value_none self._cached_state: IncrementalColumnState = None """State dictionary cached on first access""" @@ -170,6 +179,7 @@ def _make_transforms(self) -> None: self.last_value_func, self._primary_key, set(self._cached_state["unique_hashes"]), + self.on_cursor_value_none, ) @classmethod diff --git a/dlt/extract/incremental/exceptions.py b/dlt/extract/incremental/exceptions.py index e318a028dc..1eebccb37c 100644 --- a/dlt/extract/incremental/exceptions.py +++ b/dlt/extract/incremental/exceptions.py @@ -8,7 +8,18 @@ def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = N self.item = item msg = ( msg - or f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document - if those are different from the names you see in database." + or f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document because they can be different from the names you see in database." + ) + super().__init__(pipe_name, msg) + + +class IncrementalCursorPathHasValueNone(PipeException): + def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None: + self.json_path = json_path + self.item = item + msg = ( + msg + or f"Cursor element with JSON path {json_path} has the value `None` in extracted data item. All data items must contain a value != None. Construct the incremental with on_cursor_value_none='include' if you want to include such rows" ) super().__init__(pipe_name, msg) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index d8f6b79b49..014b4fdf91 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -10,8 +10,9 @@ from dlt.extract.incremental.exceptions import ( IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, + IncrementalCursorPathHasValueNone, ) -from dlt.extract.incremental.typing import TCursorValue, LastValueFunc +from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueNone from dlt.extract.utils import resolve_column_value from dlt.extract.items import TTableHintTemplate from dlt.common.schema.typing import TColumnNames @@ -54,6 +55,7 @@ def __init__( last_value_func: LastValueFunc[TCursorValue], primary_key: Optional[TTableHintTemplate[TColumnNames]], unique_hashes: Set[str], + on_cursor_value_none: OnCursorValueNone = "raise", ) -> None: self.resource_name = resource_name self.cursor_path = cursor_path @@ -66,6 +68,7 @@ def __init__( self.primary_key = primary_key self.unique_hashes = unique_hashes self.start_unique_hashes = set(unique_hashes) + self.on_cursor_value_none = on_cursor_value_none # compile jsonpath self._compiled_cursor_path = compile_path(cursor_path) @@ -120,14 +123,25 @@ def find_cursor_value(self, row: TDataItem) -> Any: if self.cursor_path not in row.keys() and not self._compiled_cursor_path: raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) + row_value = self._value_at_cursor_path(row) + + if self.on_cursor_value_none == "raise" and row_value is None: + raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) + + return row_value + + def _value_at_cursor_path(self, row: TDataItem) -> Any: row_value = row.get(self.cursor_path, None) if self._compiled_cursor_path: - row_values = find_values(self._compiled_cursor_path, row) - if not row_values: + cursor_values = find_values(self._compiled_cursor_path, row) + if cursor_values == [] and self.on_cursor_value_none == "raise": raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) - row_value = row_values[0] - + elif None in cursor_values and self.on_cursor_value_none == "raise": + raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row) + else: + # ignores the other found values, e.g. when the path is $data.items[*].created_at + row_value = cursor_values[0] return row_value def __call__( @@ -136,7 +150,7 @@ def __call__( ) -> Tuple[Optional[TDataItem], bool, bool]: """ Returns: - Tuple (row, start_out_of_range, end_out_of_range) where row is either the data item or `None` if it is completely filtered out + Tuple (row, is_below_initial_value, is_above_end_value) where row is either the data item or `None` if it is completely filtered out """ if row is None: return row, False, False diff --git a/dlt/extract/incremental/typing.py b/dlt/extract/incremental/typing.py index 9cec97d34d..14a1435a8a 100644 --- a/dlt/extract/incremental/typing.py +++ b/dlt/extract/incremental/typing.py @@ -1,8 +1,9 @@ -from typing import TypedDict, Optional, Any, List, TypeVar, Callable, Sequence +from typing import TypedDict, Optional, Any, List, Literal, TypeVar, Callable, Sequence TCursorValue = TypeVar("TCursorValue", bound=Any) LastValueFunc = Callable[[Sequence[TCursorValue]], Any] +OnCursorValueNone = Literal["raise", "include"] class IncrementalColumnState(TypedDict): diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 2a75cbcfad..b26b5ba067 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -41,7 +41,7 @@ user's profile Stateless data cannot change - for example, a recorded event, suc Because stateless data does not need to be updated, we can just append it. -For stateful data, comes a second question - Can I extract it incrementally from the source? If yes, you should use [slowly changing dimensions (Type-2)](#scd2-strategy), which allow you to maintain historical records of data changes over time. +For stateful data, comes a second question - Can I extract it incrementally from the source? If yes, you should use [slowly changing dimensions (Type-2)](#scd2-strategy), which allow you to maintain historical records of data changes over time. If not, then we need to replace the entire data set. If however we can request the data incrementally such as "all users added or modified since yesterday" then we can simply apply changes to our existing @@ -657,7 +657,7 @@ than `end_value`. :::caution In rare cases when you use Incremental with a transformer, `dlt` will not be able to automatically close -generator associated with a row that is out of range. You can still use still call `can_close()` method on +generator associated with a row that is out of range. You can still call the `can_close()` method on incremental and exit yield loop when true. ::: @@ -1077,4 +1077,4 @@ sources: } ``` -Verify that the `last_value` is updated between pipeline runs. \ No newline at end of file +Verify that the `last_value` is updated between pipeline runs. diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 16ad0a231f..273dcd2ff3 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -31,6 +31,7 @@ from dlt.extract.incremental.exceptions import ( IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, + IncrementalCursorPathHasValueNone, ) from dlt.pipeline.exceptions import PipelineStepFailed @@ -635,9 +636,28 @@ def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")): @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_updates_incremental_cursor(item_type: TestDataItemFormat) -> None: - last_values = [] +def test_cursor_path_none_updates_incremental_cursor_1(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1, "created_at": None}, + {"id": 2, "created_at": 1}, + {"id": 3, "created_at": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): + yield source_items + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 2 + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_updates_incremental_cursor_2(item_type: TestDataItemFormat) -> None: data = [ {"id": 1, "created_at": 1}, {"id": 2, "created_at": None}, @@ -646,44 +666,119 @@ def test_cursor_path_none_updates_incremental_cursor(item_type: TestDataItemForm source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at")): - last_values.append(created_at.last_value) + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) p.extract(some_data()) - assert last_values == [None] - - p.extract(some_data()) - assert last_values == [None, 2] + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 2 @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_updates_incremental_cursor_2(item_type: TestDataItemFormat) -> None: - last_values = [] - +def test_cursor_path_none_updates_incremental_cursor_3(item_type: TestDataItemFormat) -> None: data = [ - {"id": 1, "created_at": None}, + {"id": 1, "created_at": 1}, {"id": 2, "created_at": 2}, - {"id": 3, "created_at": 2}, + {"id": 3, "created_at": None}, ] source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at")): - last_values.append(created_at.last_value) + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): yield source_items p = dlt.pipeline(pipeline_name=uniq_id()) p.extract(some_data()) - assert last_values == [None] + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 2 - p.extract(some_data()) - assert last_values == [None, 2] + +def test_cursor_path_none_can_raise_on_none() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + source_items = [ + {"id": 1, "created_at": 1}, + {"id": 2, "created_at": None}, + {"id": 3, "created_at": 2}, + ] + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="raise")): + yield source_items + + with pytest.raises(IncrementalCursorPathHasValueNone) as py_ex: + list(some_data()) + assert py_ex.value.json_path == "created_at" + + # same thing when run in pipeline + with pytest.raises(PipelineStepFailed) as pip_ex: + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) + + assert isinstance(pip_ex.value.__context__, IncrementalCursorPathHasValueNone) + assert pip_ex.value.__context__.json_path == "created_at" + + +def test_cursor_path_none_nested_can_raise_on_none_1() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("data.items[0].created_at", on_cursor_value_none="raise") + ): + yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} + + with pytest.raises(IncrementalCursorPathHasValueNone) as e: + list(some_data()) + assert e.value.json_path == "data.items[0].created_at" + + +def test_cursor_path_none_nested_can_raise_on_none_2() -> None: + # No pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("data.items[*].created_at", on_cursor_value_none="raise") + ): + yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} + + with pytest.raises(IncrementalCursorPathHasValueNone) as e: + list(some_data()) + assert e.value.json_path == "data.items[*].created_at" + + +def test_cursor_path_none_nested_can_include_on_none_1() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental( + "data.items[*].created_at", on_cursor_value_none="include" + ) + ): + yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} + + results = list(some_data()) + assert results[0]["data"]["items"] == [{"created_at": None}, {"created_at": 1}] + + +def test_cursor_path_none_nested_can_include_on_none_2() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental( + "data.items[0].created_at", on_cursor_value_none="include" + ) + ): + yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} + + results = list(some_data()) + assert results[0]["data"]["items"] == [{"created_at": None}, {"created_at": 1}] @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_does_not_discard_row(item_type: TestDataItemFormat) -> None: +def test_cursor_path_none_can_include_on_none_1(item_type: TestDataItemFormat) -> None: data = [ {"id": 1, "created_at": 1}, {"id": 2, "created_at": None}, @@ -692,24 +787,15 @@ def test_cursor_path_none_does_not_discard_row(item_type: TestDataItemFormat) -> source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at")): + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): yield source_items - result = list(some_data()) - if item_type == "object": - assert len(result) == 3 - else: - assert len(result[0]) == 3 - - result_2 = list(some_data()) - if item_type == "object": - assert len(result_2) == 3 - else: - assert len(result_2[0]) == 3 + result = data_item_to_list(item_type, list(some_data())) + assert data_item_length(result) == 3 @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) -def test_cursor_path_none_does_not_discard_row_2(item_type: TestDataItemFormat) -> None: +def test_cursor_path_none_can_include_on_none_2(item_type: TestDataItemFormat) -> None: data = [ {"id": 1, "created_at": None}, {"id": 2, "created_at": 1}, @@ -718,21 +804,29 @@ def test_cursor_path_none_does_not_discard_row_2(item_type: TestDataItemFormat) source_items = data_to_item_format(item_type, data) @dlt.resource - def some_data(created_at=dlt.sources.incremental("created_at")): + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): yield source_items - result = list(some_data()) - if item_type == "object": - assert len(result) == 3 - else: - assert len(result[0]) == 3 + result = data_item_to_list(item_type, list(some_data())) + assert data_item_length(result) == 3 - result_2 = list(some_data()) - if item_type == "object": - assert len(result_2) == 3 - else: - assert len(result_2[0]) == 3 +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_can_include_on_none_3(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2, "created_at": 1}, + {"id": 3, "created_at": None}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data(created_at=dlt.sources.incremental("created_at", on_cursor_value_none="include")): + yield source_items + + result = list(some_data()) + values = data_item_to_list(item_type, result) + assert data_item_length(values) == 3 def test_json_path_cursor() -> None: