diff --git a/dlt/extract/incremental/exceptions.py b/dlt/extract/incremental/exceptions.py index 539a211280..e056098ba6 100644 --- a/dlt/extract/incremental/exceptions.py +++ b/dlt/extract/incremental/exceptions.py @@ -2,8 +2,12 @@ from dlt.common.typing import TDataItem + + class IncrementalCursorPathMissing(PipeException): - def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None: + def __init__( + self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None + ) -> None: self.json_path = json_path self.item = item msg = ( @@ -14,7 +18,9 @@ def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = N class IncrementalCursorPathHasValueNone(PipeException): - def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None: + def __init__( + self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None + ) -> None: self.json_path = json_path self.item = item msg = ( diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index c0989fdd43..a92b0c1a0e 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -305,8 +305,6 @@ def __call__( # TODO: Json path support. For now assume the cursor_path is a column name cursor_path = self.cursor_path - if self.on_cursor_value_missing == "exclude": - tbl = self._remove_null_at_cursor_path(tbl, cursor_path) # The new max/min value try: @@ -324,6 +322,11 @@ def __call__( " must be a column name.", ) from e + if tbl.schema.field(cursor_path).nullable: + tbl_without_null, tbl_with_null = self._process_null_at_cursor_path(tbl) + + tbl = tbl_without_null + # If end_value is provided, filter to include table rows that are "less" than end_value if self.end_value is not None: end_value_scalar = to_arrow_scalar(self.end_value, cursor_data_type) @@ -382,6 +385,8 @@ def __call__( ) ) ) + if self.on_cursor_value_missing == "include": + tbl = pa.concat_tables([tbl, tbl_with_null]) if len(tbl) == 0: return None, start_out_of_range, end_out_of_range @@ -393,7 +398,11 @@ def __call__( return tbl.to_pandas(), start_out_of_range, end_out_of_range return tbl, start_out_of_range, end_out_of_range - def _remove_null_at_cursor_path(self, tbl, cursor_path): - mask = pa.compute.is_valid(tbl[cursor_path]) - filtered = tbl.filter(mask) - return filtered + def _process_null_at_cursor_path(self, tbl: pa.Table) -> Tuple["pa.Table", "pa.Table"]: + mask = pa.compute.is_valid(tbl[self.cursor_path]) + rows_without_null = tbl.filter(mask) + rows_with_null = tbl.filter(pa.compute.invert(mask)) + if self.on_cursor_value_missing == "raise": + if rows_with_null.num_rows > 0: + raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path) + return rows_without_null, rows_with_null diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 93aef790fb..d579d9b9ee 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -780,13 +780,14 @@ def some_data( assert s["last_value"] == 2 -def test_cursor_path_none_can_raise_on_none() -> None: - # No None support for pandas and arrow yet - source_items = [ +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_can_raise_on_none_1(item_type: TestDataItemFormat) -> None: + data = [ {"id": 1, "created_at": 1}, {"id": 2, "created_at": None}, {"id": 3, "created_at": 2}, ] + source_items = data_to_item_format(item_type, data) @dlt.resource def some_data( @@ -807,6 +808,70 @@ def some_data( assert pip_ex.value.__context__.json_path == "created_at" +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_path_none_can_raise_on_none_2(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1, "created_at": 1}, + {"id": 2}, + {"id": 3, "created_at": 2}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise") + ): + yield source_items + + # there is no fixed, error because cursor path is missing + if item_type == "object": + with pytest.raises(IncrementalCursorPathMissing) as ex: + list(some_data()) + assert ex.value.json_path == "created_at" + # there is a fixed schema, error because value is null + else: + with pytest.raises(IncrementalCursorPathHasValueNone) as e: + list(some_data()) + assert e.value.json_path == "created_at" + + # same thing when run in pipeline + with pytest.raises(PipelineStepFailed) as e: + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) + if item_type == "object": + assert isinstance(e.value.__context__, IncrementalCursorPathMissing) + else: + assert isinstance(e.value.__context__, IncrementalCursorPathHasValueNone) + assert e.value.__context__.json_path == "created_at" + + +@pytest.mark.parametrize("item_type", ["arrow-table", "arrow-batch", "pandas"]) +def test_cursor_path_none_can_raise_on_column_missing(item_type: TestDataItemFormat) -> None: + data = [ + {"id": 1}, + {"id": 2}, + {"id": 3}, + ] + source_items = data_to_item_format(item_type, data) + + @dlt.resource + def some_data( + created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise") + ): + yield source_items + + with pytest.raises(IncrementalCursorPathMissing) as py_ex: + list(some_data()) + assert py_ex.value.json_path == "created_at" + + # same thing when run in pipeline + with pytest.raises(PipelineStepFailed) as pip_ex: + p = dlt.pipeline(pipeline_name=uniq_id()) + p.extract(some_data()) + assert pip_ex.value.__context__.json_path == "created_at" + assert isinstance(pip_ex.value.__context__, IncrementalCursorPathMissing) + + def test_cursor_path_none_nested_can_raise_on_none_1() -> None: # No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails @dlt.resource