Skip to content

Commit

Permalink
arrow-table: processes null rows and adds them back after processing.…
Browse files Browse the repository at this point in the history
… Fails for arrow-batch
  • Loading branch information
willi-mueller committed Aug 7, 2024
1 parent d127e46 commit 6d7d4c0
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 11 deletions.
10 changes: 8 additions & 2 deletions dlt/extract/incremental/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
from dlt.common.typing import TDataItem




class IncrementalCursorPathMissing(PipeException):
def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None:
def __init__(
self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None
) -> None:
self.json_path = json_path
self.item = item
msg = (
Expand All @@ -14,7 +18,9 @@ def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = N


class IncrementalCursorPathHasValueNone(PipeException):
def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None:
def __init__(
self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None
) -> None:
self.json_path = json_path
self.item = item
msg = (
Expand Down
21 changes: 15 additions & 6 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,6 @@ def __call__(

# TODO: Json path support. For now assume the cursor_path is a column name
cursor_path = self.cursor_path
if self.on_cursor_value_missing == "exclude":
tbl = self._remove_null_at_cursor_path(tbl, cursor_path)

# The new max/min value
try:
Expand All @@ -324,6 +322,11 @@ def __call__(
" must be a column name.",
) from e

if tbl.schema.field(cursor_path).nullable:
tbl_without_null, tbl_with_null = self._process_null_at_cursor_path(tbl)

tbl = tbl_without_null

# If end_value is provided, filter to include table rows that are "less" than end_value
if self.end_value is not None:
end_value_scalar = to_arrow_scalar(self.end_value, cursor_data_type)
Expand Down Expand Up @@ -382,6 +385,8 @@ def __call__(
)
)
)
if self.on_cursor_value_missing == "include":
tbl = pa.concat_tables([tbl, tbl_with_null])

if len(tbl) == 0:
return None, start_out_of_range, end_out_of_range
Expand All @@ -393,7 +398,11 @@ def __call__(
return tbl.to_pandas(), start_out_of_range, end_out_of_range
return tbl, start_out_of_range, end_out_of_range

def _remove_null_at_cursor_path(self, tbl, cursor_path):
mask = pa.compute.is_valid(tbl[cursor_path])
filtered = tbl.filter(mask)
return filtered
def _process_null_at_cursor_path(self, tbl: pa.Table) -> Tuple["pa.Table", "pa.Table"]:
mask = pa.compute.is_valid(tbl[self.cursor_path])
rows_without_null = tbl.filter(mask)
rows_with_null = tbl.filter(pa.compute.invert(mask))
if self.on_cursor_value_missing == "raise":
if rows_with_null.num_rows > 0:
raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path)
return rows_without_null, rows_with_null
71 changes: 68 additions & 3 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,13 +780,14 @@ def some_data(
assert s["last_value"] == 2


def test_cursor_path_none_can_raise_on_none() -> None:
# No None support for pandas and arrow yet
source_items = [
@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_cursor_path_none_can_raise_on_none_1(item_type: TestDataItemFormat) -> None:
data = [
{"id": 1, "created_at": 1},
{"id": 2, "created_at": None},
{"id": 3, "created_at": 2},
]
source_items = data_to_item_format(item_type, data)

@dlt.resource
def some_data(
Expand All @@ -807,6 +808,70 @@ def some_data(
assert pip_ex.value.__context__.json_path == "created_at"


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_cursor_path_none_can_raise_on_none_2(item_type: TestDataItemFormat) -> None:
data = [
{"id": 1, "created_at": 1},
{"id": 2},
{"id": 3, "created_at": 2},
]
source_items = data_to_item_format(item_type, data)

@dlt.resource
def some_data(
created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise")
):
yield source_items

# there is no fixed, error because cursor path is missing
if item_type == "object":
with pytest.raises(IncrementalCursorPathMissing) as ex:
list(some_data())
assert ex.value.json_path == "created_at"
# there is a fixed schema, error because value is null
else:
with pytest.raises(IncrementalCursorPathHasValueNone) as e:
list(some_data())
assert e.value.json_path == "created_at"

# same thing when run in pipeline
with pytest.raises(PipelineStepFailed) as e:
p = dlt.pipeline(pipeline_name=uniq_id())
p.extract(some_data())
if item_type == "object":
assert isinstance(e.value.__context__, IncrementalCursorPathMissing)
else:
assert isinstance(e.value.__context__, IncrementalCursorPathHasValueNone)
assert e.value.__context__.json_path == "created_at"


@pytest.mark.parametrize("item_type", ["arrow-table", "arrow-batch", "pandas"])
def test_cursor_path_none_can_raise_on_column_missing(item_type: TestDataItemFormat) -> None:
data = [
{"id": 1},
{"id": 2},
{"id": 3},
]
source_items = data_to_item_format(item_type, data)

@dlt.resource
def some_data(
created_at=dlt.sources.incremental("created_at", on_cursor_value_missing="raise")
):
yield source_items

with pytest.raises(IncrementalCursorPathMissing) as py_ex:
list(some_data())
assert py_ex.value.json_path == "created_at"

# same thing when run in pipeline
with pytest.raises(PipelineStepFailed) as pip_ex:
p = dlt.pipeline(pipeline_name=uniq_id())
p.extract(some_data())
assert pip_ex.value.__context__.json_path == "created_at"
assert isinstance(pip_ex.value.__context__, IncrementalCursorPathMissing)


def test_cursor_path_none_nested_can_raise_on_none_1() -> None:
# No nested json path support for pandas and arrow. See test_nested_cursor_path_arrow_fails
@dlt.resource
Expand Down

0 comments on commit 6d7d4c0

Please sign in to comment.