From 1723faa92717090f2c0d28f471d3772f647130e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Willi=20M=C3=BCller?= Date: Fri, 30 Aug 2024 23:01:43 +0530 Subject: [PATCH] Fix/1571 Incremental: Optionally raise, load, or ignore raise records with cursor_path missing or None value (#1576) * allows specification of what happens on cursor_path missing or cursor_path having the value None: raise differentiated exceptions, exclude row, or include row. * Documents handling None values at the incremental cursor * fixes incremental extract crashing if one record has cursor_path = None * test that add_map can be used to transform items before the incremental function is called * Unifies treating of None values for python Objects (including pydantic), pandas, and arrow --------- Co-authored-by: Marcin Rudolf --- dlt/extract/incremental/__init__.py | 18 +- dlt/extract/incremental/exceptions.py | 19 +- dlt/extract/incremental/transform.py | 89 +++- dlt/extract/incremental/typing.py | 3 +- .../docs/general-usage/incremental-loading.md | 69 ++- tests/extract/test_incremental.py | 460 +++++++++++++++++- tests/pipeline/test_pipeline_extra.py | 1 - 7 files changed, 624 insertions(+), 35 deletions(-) diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index c1117370b5..343a737c07 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -35,7 +35,12 @@ IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, ) -from dlt.extract.incremental.typing import IncrementalColumnState, TCursorValue, LastValueFunc +from dlt.extract.incremental.typing import ( + IncrementalColumnState, + TCursorValue, + LastValueFunc, + OnCursorValueMissing, +) from dlt.extract.pipe import Pipe from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform from dlt.extract.incremental.transform import ( @@ -81,7 +86,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa >>> info = p.run(r, destination="duckdb") Args: - cursor_path: The name or a JSON path to an cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database. + cursor_path: The name or a JSON path to a cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database. initial_value: Optional value used for `last_value` when no state is available, e.g. on the first run of the pipeline. If not provided `last_value` will be `None` on the first run. last_value_func: Callable used to determine which cursor value to save in state. It is called with a list of the stored state value and all cursor vals from currently processing items. Default is `max` primary_key: Optional primary key used to deduplicate data. If not provided, a primary key defined by the resource will be used. Pass a tuple to define a compound key. Pass empty tuple to disable unique checks @@ -95,6 +100,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class. The values passed explicitly to Incremental will be ignored. Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded + on_cursor_value_missing: Specify what happens when the cursor_path does not exist in a record or a record has `None` at the cursor_path: raise, include, exclude """ # this is config/dataclass so declare members @@ -104,6 +110,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa end_value: Optional[Any] = None row_order: Optional[TSortOrder] = None allow_external_schedulers: bool = False + on_cursor_value_missing: OnCursorValueMissing = "raise" # incremental acting as empty EMPTY: ClassVar["Incremental[Any]"] = None @@ -118,6 +125,7 @@ def __init__( end_value: Optional[TCursorValue] = None, row_order: Optional[TSortOrder] = None, allow_external_schedulers: bool = False, + on_cursor_value_missing: OnCursorValueMissing = "raise", ) -> None: # make sure that path is valid if cursor_path: @@ -133,6 +141,11 @@ def __init__( self._primary_key: Optional[TTableHintTemplate[TColumnNames]] = primary_key self.row_order = row_order self.allow_external_schedulers = allow_external_schedulers + if on_cursor_value_missing not in ["raise", "include", "exclude"]: + raise ValueError( + f"Unexpected argument for on_cursor_value_missing. Got {on_cursor_value_missing}" + ) + self.on_cursor_value_missing = on_cursor_value_missing self._cached_state: IncrementalColumnState = None """State dictionary cached on first access""" @@ -171,6 +184,7 @@ def _make_transforms(self) -> None: self.last_value_func, self._primary_key, set(self._cached_state["unique_hashes"]), + self.on_cursor_value_missing, ) @classmethod diff --git a/dlt/extract/incremental/exceptions.py b/dlt/extract/incremental/exceptions.py index a5f94c2974..973d3b6585 100644 --- a/dlt/extract/incremental/exceptions.py +++ b/dlt/extract/incremental/exceptions.py @@ -5,12 +5,27 @@ class IncrementalCursorPathMissing(PipeException): - def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None: + def __init__( + self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None + ) -> None: + self.json_path = json_path + self.item = item + msg = ( + msg + or f"Cursor element with JSON path `{json_path}` was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document because they can be different from the names you see in database." + ) + super().__init__(pipe_name, msg) + + +class IncrementalCursorPathHasValueNone(PipeException): + def __init__( + self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None + ) -> None: self.json_path = json_path self.item = item msg = ( msg - or f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document - if those are different from the names you see in database." + or f"Cursor element with JSON path `{json_path}` has the value `None` in extracted data item. All data items must contain a value != None. Construct the incremental with on_cursor_value_none='include' if you want to include such rows" ) super().__init__(pipe_name, msg) diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 0ac9fdf520..eb448d4266 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -1,5 +1,5 @@ -from datetime import datetime, date # noqa: I251 -from typing import Any, Optional, Set, Tuple, List +from datetime import datetime # noqa: I251 +from typing import Any, Optional, Set, Tuple, List, Type from dlt.common.exceptions import MissingDependencyException from dlt.common.utils import digest128 @@ -11,8 +11,9 @@ IncrementalCursorInvalidCoercion, IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, + IncrementalCursorPathHasValueNone, ) -from dlt.extract.incremental.typing import TCursorValue, LastValueFunc +from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing from dlt.extract.utils import resolve_column_value from dlt.extract.items import TTableHintTemplate from dlt.common.schema.typing import TColumnNames @@ -55,6 +56,7 @@ def __init__( last_value_func: LastValueFunc[TCursorValue], primary_key: Optional[TTableHintTemplate[TColumnNames]], unique_hashes: Set[str], + on_cursor_value_missing: OnCursorValueMissing = "raise", ) -> None: self.resource_name = resource_name self.cursor_path = cursor_path @@ -67,6 +69,7 @@ def __init__( self.primary_key = primary_key self.unique_hashes = unique_hashes self.start_unique_hashes = set(unique_hashes) + self.on_cursor_value_missing = on_cursor_value_missing # compile jsonpath self._compiled_cursor_path = compile_path(cursor_path) @@ -116,21 +119,39 @@ class JsonIncremental(IncrementalTransform): def find_cursor_value(self, row: TDataItem) -> Any: """Finds value in row at cursor defined by self.cursor_path. - Will use compiled JSONPath if present, otherwise it reverts to column search if row is dict + Will use compiled JSONPath if present. + Otherwise, reverts to field access if row is dict, Pydantic model, or of other class. """ - row_value: Any = None + key_exc: Type[Exception] = IncrementalCursorPathHasValueNone if self._compiled_cursor_path: - row_values = find_values(self._compiled_cursor_path, row) - if row_values: - row_value = row_values[0] + # ignores the other found values, e.g. when the path is $data.items[*].created_at + try: + row_value = find_values(self._compiled_cursor_path, row)[0] + except IndexError: + # empty list so raise a proper exception + row_value = None + key_exc = IncrementalCursorPathMissing else: try: - row_value = row[self.cursor_path] - except Exception: - pass - if row_value is None: - raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row) - return row_value + try: + row_value = row[self.cursor_path] + except TypeError: + # supports Pydantic models and other classes + row_value = getattr(row, self.cursor_path) + except (KeyError, AttributeError): + # attr not found so raise a proper exception + row_value = None + key_exc = IncrementalCursorPathMissing + + # if we have a value - return it + if row_value is not None: + return row_value + + if self.on_cursor_value_missing == "raise": + # raise missing path or None value exception + raise key_exc(self.resource_name, self.cursor_path, row) + elif self.on_cursor_value_missing == "exclude": + return None def __call__( self, @@ -144,6 +165,12 @@ def __call__( return row, False, False row_value = self.find_cursor_value(row) + if row_value is None: + if self.on_cursor_value_missing == "exclude": + return None, False, False + else: + return row, False, False + last_value = self.last_value last_value_func = self.last_value_func @@ -299,6 +326,7 @@ def __call__( # TODO: Json path support. For now assume the cursor_path is a column name cursor_path = self.cursor_path + # The new max/min value try: # NOTE: datetimes are always pendulum in UTC @@ -310,11 +338,16 @@ def __call__( self.resource_name, cursor_path, tbl, - f"Column name {cursor_path} was not found in the arrow table. Not nested JSON paths" + f"Column name `{cursor_path}` was not found in the arrow table. Nested JSON paths" " are not supported for arrow tables and dataframes, the incremental cursor_path" " must be a column name.", ) from e + if tbl.schema.field(cursor_path).nullable: + tbl_without_null, tbl_with_null = self._process_null_at_cursor_path(tbl) + + tbl = tbl_without_null + # If end_value is provided, filter to include table rows that are "less" than end_value if self.end_value is not None: try: @@ -396,12 +429,28 @@ def __call__( ) ) + # drop the temp unique index before concat and returning + if "_dlt_index" in tbl.schema.names: + tbl = pyarrow.remove_columns(tbl, ["_dlt_index"]) + + if self.on_cursor_value_missing == "include": + if isinstance(tbl, pa.RecordBatch): + assert isinstance(tbl_with_null, pa.RecordBatch) + tbl = pa.Table.from_batches([tbl, tbl_with_null]) + else: + tbl = pa.concat_tables([tbl, tbl_with_null]) + if len(tbl) == 0: return None, start_out_of_range, end_out_of_range - try: - tbl = pyarrow.remove_columns(tbl, ["_dlt_index"]) - except KeyError: - pass if is_pandas: - return tbl.to_pandas(), start_out_of_range, end_out_of_range + tbl = tbl.to_pandas() return tbl, start_out_of_range, end_out_of_range + + def _process_null_at_cursor_path(self, tbl: "pa.Table") -> Tuple["pa.Table", "pa.Table"]: + mask = pa.compute.is_valid(tbl[self.cursor_path]) + rows_without_null = tbl.filter(mask) + rows_with_null = tbl.filter(pa.compute.invert(mask)) + if self.on_cursor_value_missing == "raise": + if rows_with_null.num_rows > 0: + raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path) + return rows_without_null, rows_with_null diff --git a/dlt/extract/incremental/typing.py b/dlt/extract/incremental/typing.py index 9cec97d34d..a5e2612db4 100644 --- a/dlt/extract/incremental/typing.py +++ b/dlt/extract/incremental/typing.py @@ -1,8 +1,9 @@ -from typing import TypedDict, Optional, Any, List, TypeVar, Callable, Sequence +from typing import TypedDict, Optional, Any, List, Literal, TypeVar, Callable, Sequence TCursorValue = TypeVar("TCursorValue", bound=Any) LastValueFunc = Callable[[Sequence[TCursorValue]], Any] +OnCursorValueMissing = Literal["raise", "include", "exclude"] class IncrementalColumnState(TypedDict): diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 68fc46e6dc..5ff587f20e 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -689,7 +689,7 @@ than `end_value`. :::caution In rare cases when you use Incremental with a transformer, `dlt` will not be able to automatically close -generator associated with a row that is out of range. You can still use still call `can_close()` method on +generator associated with a row that is out of range. You can still call the `can_close()` method on incremental and exit yield loop when true. ::: @@ -907,22 +907,75 @@ Consider the example below for reading incremental loading parameters from "conf ``` `id_after` incrementally stores the latest `cursor_path` value for future pipeline runs. -### Loading NULL values in the incremental cursor field +### Loading when incremental cursor path is missing or value is None/NULL -When loading incrementally with a cursor field, each row is expected to contain a value at the cursor field that is not `None`. -For example, the following source data will raise an error: +You can customize the incremental processing of dlt by setting the parameter `on_cursor_value_missing`. + +When loading incrementally with the default settings, there are two assumptions: +1. each row contains the cursor path +2. each row is expected to contain a value at the cursor path that is not `None`. + +For example, the two following source data will raise an error: ```py @dlt.resource -def some_data(updated_at=dlt.sources.incremental("updated_at")): +def some_data_without_cursor_path(updated_at=dlt.sources.incremental("updated_at")): yield [ {"id": 1, "created_at": 1, "updated_at": 1}, - {"id": 2, "created_at": 2, "updated_at": 2}, + {"id": 2, "created_at": 2}, # cursor field is missing + ] + +list(some_data_without_cursor_path()) + +@dlt.resource +def some_data_without_cursor_value(updated_at=dlt.sources.incremental("updated_at")): + yield [ + {"id": 1, "created_at": 1, "updated_at": 1}, + {"id": 3, "created_at": 4, "updated_at": None}, # value at cursor field is None + ] + +list(some_data_without_cursor_value()) +``` + + +To process a data set where some records do not include the incremental cursor path or where the values at the cursor path are `None,` there are the following four options: + +1. Configure the incremental load to raise an exception in case there is a row where the cursor path is missing or has the value `None` using `incremental(..., on_cursor_value_missing="raise")`. This is the default behavior. +2. Configure the incremental load to tolerate the missing cursor path and `None` values using `incremental(..., on_cursor_value_missing="include")`. +3. Configure the incremental load to exclude the missing cursor path and `None` values using `incremental(..., on_cursor_value_missing="exclude")`. +4. Before the incremental processing begins: Ensure that the incremental field is present and transform the values at the incremental cursor to a value different from `None`. [See docs below](#transform-records-before-incremental-processing) + +Here is an example of including rows where the incremental cursor value is missing or `None`: +```py +@dlt.resource +def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_missing="include")): + yield [ + {"id": 1, "created_at": 1, "updated_at": 1}, + {"id": 2, "created_at": 2}, + {"id": 3, "created_at": 4, "updated_at": None}, + ] + +result = list(some_data()) +assert len(result) == 3 +assert result[1] == {"id": 2, "created_at": 2} +assert result[2] == {"id": 3, "created_at": 4, "updated_at": None} +``` + +If you do not want to import records without the cursor path or where the value at the cursor path is `None` use the following incremental configuration: + +```py +@dlt.resource +def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_missing="exclude")): + yield [ + {"id": 1, "created_at": 1, "updated_at": 1}, + {"id": 2, "created_at": 2}, {"id": 3, "created_at": 4, "updated_at": None}, ] -list(some_data()) +result = list(some_data()) +assert len(result) == 1 ``` +### Transform records before incremental processing If you want to load data that includes `None` values you can transform the records before the incremental processing. You can add steps to the pipeline that [filter, transform, or pivot your data](../general-usage/resource.md#filter-transform-and-pivot-data). @@ -1162,4 +1215,4 @@ sources: } ``` -Verify that the `last_value` is updated between pipeline runs. \ No newline at end of file +Verify that the `last_value` is updated between pipeline runs. diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index c401552fb2..a9867aa54b 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -33,6 +33,7 @@ IncrementalCursorInvalidCoercion, IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, + IncrementalCursorPathHasValueNone, ) from dlt.pipeline.exceptions import PipelineStepFailed @@ -44,6 +45,10 @@ ALL_TEST_DATA_ITEM_FORMATS, ) +from tests.pipeline.utils import assert_query_data + +import pyarrow as pa + @pytest.fixture(autouse=True) def switch_to_fifo(): @@ -167,8 +172,9 @@ def some_data(created_at=dlt.sources.incremental("created_at")): p = dlt.pipeline(pipeline_name=uniq_id()) p.extract(some_data()) - p.extract(some_data()) + assert values == [None] + p.extract(some_data()) assert values == [None, 5] @@ -635,6 +641,458 @@ def some_data(last_timestamp=dlt.sources.incremental("item.timestamp")): assert pip_ex.value.__context__.json_path == "item.timestamp" +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_includes_records_and_updates_incremental_cursor_1( + item_type: TestDataItemFormat, +) -> None: + data = [ + {"id": 1, "created_at": None}, + {"id": 2, "created_at": 1}, + {"id": 3, "created_at": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include") + ): + yield source_items + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") + + assert_query_data(p, "select count(id) from some_data", [3]) + assert_query_data(p, "select count(created_at) from some_data", [2]) + + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 2 + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_does_not_include_overlapping_records( + item_type: TestDataItemFormat, +) -> None: + @dlt.resource + def some_data( + invocation: int, + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include"), + ): + if invocation == 1: + yield data_to_item_format( + item_type, + [ + {"id": 1, "created_at": None}, + {"id": 2, "created_at": 1}, + {"id": 3, "created_at": 2}, + ], + ) + elif invocation == 2: + yield data_to_item_format( + item_type, + [ + {"id": 4, "created_at": 1}, + {"id": 5, "created_at": None}, + {"id": 6, "created_at": 3}, + ], + ) + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(1), destination="duckdb") + p.run(some_data(2), destination="duckdb") + + assert_query_data(p, "select id from some_data order by id", [1, 2, 3, 5, 6]) + assert_query_data( + p, "select created_at from some_data order by created_at", [1, 2, 3, None, None] + ) + + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 3 + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_includes_records_and_updates_incremental_cursor_2( + item_type: TestDataItemFormat, +) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2, "created_at": None}, + {"id": 3, "created_at": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include") + ): + yield source_items + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") + + assert_query_data(p, "select count(id) from some_data", [3]) + assert_query_data(p, "select count(created_at) from some_data", [2]) + + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 2 + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_includes_records_and_updates_incremental_cursor_3( + item_type: TestDataItemFormat, +) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2, "created_at": 2}, + {"id": 3, "created_at": None}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include") + ): + yield source_items + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") + assert_query_data(p, "select count(id) from some_data", [3]) + assert_query_data(p, "select count(created_at) from some_data", [2]) + + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 2 + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_includes_records_without_cursor_path( + item_type: TestDataItemFormat, +) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="include") + ): + yield source_items + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") + assert_query_data(p, "select count(id) from some_data", [2]) + assert_query_data(p, "select count(created_at) from some_data", [1]) + + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 1 + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_excludes_records_and_updates_incremental_cursor( + item_type: TestDataItemFormat, +) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2, "created_at": 2}, + {"id": 3, "created_at": None}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="exclude") + ): + yield source_items + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") + assert_query_data(p, "select count(id) from some_data", [2]) + + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "created_at" + ] + assert s["last_value"] == 2 + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_can_raise_on_none_1(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2, "created_at": None}, + {"id": 3, "created_at": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise") + ): + yield source_items + + with pytest.raises(IncrementalCursorPathHasValueNone) as py_ex: + list(some_data()) + assert py_ex.value.json_path == "created_at" + + # same thing when run in pipeline + with pytest.raises(PipelineStepFailed) as pip_ex: + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) + + assert isinstance(pip_ex.value.__context__, IncrementalCursorPathHasValueNone) + assert pip_ex.value.__context__.json_path == "created_at" + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_can_raise_on_none_2(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2}, + {"id": 3, "created_at": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise") + ): + yield source_items + + # there is no fixed, error because cursor path is missing + if item_type == "object": + with pytest.raises(IncrementalCursorPathMissing) as ex: + list(some_data()) + assert ex.value.json_path == "created_at" + # there is a fixed schema, error because value is null + else: + with pytest.raises(IncrementalCursorPathHasValueNone) as e: + list(some_data()) + assert e.value.json_path == "created_at" + + # same thing when run in pipeline + with pytest.raises(PipelineStepFailed) as e: # type: ignore[assignment] + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) + if item_type == "object": + assert isinstance(e.value.__context__, IncrementalCursorPathMissing) + else: + assert isinstance(e.value.__context__, IncrementalCursorPathHasValueNone) + assert e.value.__context__.json_path == "created_at" # type: ignore[attr-defined] + + +@pytest.mark.parametrize("item_type", ["arrow-table", "arrow-batch", "pandas"]) +def test_cursor_path_none_can_raise_on_column_missing(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1}, + {"id": 2}, + {"id": 3}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise") + ): + yield source_items + + with pytest.raises(IncrementalCursorPathMissing) as py_ex: + list(some_data()) + assert py_ex.value.json_path == "created_at" + + # same thing when run in pipeline + with pytest.raises(PipelineStepFailed) as pip_ex: + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) + assert pip_ex.value.__context__.json_path == "created_at" # type: ignore[attr-defined] + assert isinstance(pip_ex.value.__context__, IncrementalCursorPathMissing) + + +def test_cursor_path_none_nested_can_raise_on_none_1() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental( + "data.items[0].created_at", on_cursor_value_missing="raise" + ) + ): + yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} + + with pytest.raises(IncrementalCursorPathHasValueNone) as e: + list(some_data()) + assert e.value.json_path == "data.items[0].created_at" + + +def test_cursor_path_none_nested_can_raise_on_none_2() -> None: + # No pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental( + "data.items[*].created_at", on_cursor_value_missing="raise" + ) + ): + yield {"data": {"items": [{"created_at": None}, {"created_at": 1}]}} + + with pytest.raises(IncrementalCursorPathHasValueNone) as e: + list(some_data()) + assert e.value.json_path == "data.items[*].created_at" + + +def test_cursor_path_none_nested_can_include_on_none_1() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental( + "data.items[*].created_at", on_cursor_value_missing="include" + ) + ): + yield { + "data": { + "items": [ + {"created_at": None}, + {"created_at": 1}, + ] + } + } + + results = list(some_data()) + assert results[0]["data"]["items"] == [ + {"created_at": None}, + {"created_at": 1}, + ] + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") + + assert_query_data(p, "select count(*) from some_data__data__items", [2]) + + +def test_cursor_path_none_nested_can_include_on_none_2() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental( + "data.items[0].created_at", on_cursor_value_missing="include" + ) + ): + yield { + "data": { + "items": [ + {"created_at": None}, + {"created_at": 1}, + ] + } + } + + results = list(some_data()) + assert results[0]["data"]["items"] == [ + {"created_at": None}, + {"created_at": 1}, + ] + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") + + assert_query_data(p, "select count(*) from some_data__data__items", [2]) + + +def test_cursor_path_none_nested_includes_rows_without_cursor_path() -> None: + # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails + @dlt.resource + def some_data( + created_at=dlt.sources.incremental( + "data.items[*].created_at", on_cursor_value_missing="include" + ) + ): + yield { + "data": { + "items": [ + {"id": 1}, + {"id": 2, "created_at": 2}, + ] + } + } + + results = list(some_data()) + assert results[0]["data"]["items"] == [ + {"id": 1}, + {"id": 2, "created_at": 2}, + ] + + p = dlt.pipeline(pipeline_name=uniq_id()) + p.run(some_data(), destination="duckdb") + + assert_query_data(p, "select count(*) from some_data__data__items", [2]) + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_set_default_value_for_incremental_cursor(item_type: TestDataItemFormat) -> None: + @dlt.resource + def some_data(created_at=dlt.sources.incremental("updated_at")): + yield data_to_item_format( + item_type, + [ + {"id": 1, "created_at": 1, "updated_at": 1}, + {"id": 2, "created_at": 4, "updated_at": None}, + {"id": 3, "created_at": 3, "updated_at": 3}, + ], + ) + + def set_default_updated_at(record): + if record.get("updated_at") is None: + record["updated_at"] = record.get("created_at", pendulum.now().int_timestamp) + return record + + def set_default_updated_at_pandas(df): + df["updated_at"] = df["updated_at"].fillna(df["created_at"]) + return df + + def set_default_updated_at_arrow(records): + updated_at_is_null = pa.compute.is_null(records.column("updated_at")) + updated_at_filled = pa.compute.if_else( + updated_at_is_null, records.column("created_at"), records.column("updated_at") + ) + if item_type == "arrow-table": + records = records.set_column( + records.schema.get_field_index("updated_at"), + pa.field("updated_at", records.column("updated_at").type), + updated_at_filled, + ) + elif item_type == "arrow-batch": + columns = [records.column(i) for i in range(records.num_columns)] + columns[2] = updated_at_filled + records = pa.RecordBatch.from_arrays(columns, schema=records.schema) + return records + + if item_type == "object": + func = set_default_updated_at + elif item_type == "pandas": + func = set_default_updated_at_pandas + elif item_type in ["arrow-table", "arrow-batch"]: + func = set_default_updated_at_arrow + + result = list(some_data().add_map(func, insert_at=1)) + values = data_item_to_list(item_type, result) + assert data_item_length(values) == 3 + assert values[1]["updated_at"] == 4 + + # same for pipeline run + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data().add_map(func, insert_at=1)) + s = p.state["sources"][p.default_schema_name]["resources"]["some_data"]["incremental"][ + "updated_at" + ] + assert s["last_value"] == 4 + + def test_json_path_cursor() -> None: @dlt.resource def some_data(last_timestamp=dlt.sources.incremental("item.timestamp|modifiedAt")): diff --git a/tests/pipeline/test_pipeline_extra.py b/tests/pipeline/test_pipeline_extra.py index c757959bec..af3a6c239e 100644 --- a/tests/pipeline/test_pipeline_extra.py +++ b/tests/pipeline/test_pipeline_extra.py @@ -246,7 +246,6 @@ class TestRow(BaseModel): example_string: str # yield model in resource so incremental fails when looking for "id" - # TODO: support pydantic models in incremental @dlt.resource(name="table_name", primary_key="id", write_disposition="replace") def generate_rows_incremental(