Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provides detail exception messages when cursor stored value cannot be coerced to data #1748

Merged
merged 1 commit into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def make_remote_path(self) -> str:
)

def make_remote_uri(self) -> str:
"""Returns path on a remote filesystem as a full uri including scheme."""
return self._job_client.make_remote_uri(self.make_remote_path())

def metrics(self) -> Optional[LoadJobMetrics]:
Expand Down
26 changes: 26 additions & 0 deletions dlt/extract/incremental/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

from dlt.extract.exceptions import PipeException
from dlt.common.typing import TDataItem

Expand All @@ -13,6 +15,30 @@ def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = N
super().__init__(pipe_name, msg)


class IncrementalCursorInvalidCoercion(PipeException):
def __init__(
self,
pipe_name: str,
cursor_path: str,
cursor_value: TDataItem,
cursor_value_type: str,
item: TDataItem,
item_type: Any,
details: str,
) -> None:
self.cursor_path = cursor_path
self.cursor_value = cursor_value
self.cursor_value_type = cursor_value_type
self.item = item
msg = (
f"Could not coerce {cursor_value_type} with value {cursor_value} and type"
f" {type(cursor_value)} to actual data item {item} at path {cursor_path} with type"
f" {item_type}: {details}. You need to use different data type for"
f" {cursor_value_type} or cast your data ie. by using `add_map` on this resource."
)
super().__init__(pipe_name, msg)


class IncrementalPrimaryKeyMissing(PipeException):
def __init__(self, pipe_name: str, primary_key_column: str, item: TDataItem) -> None:
self.primary_key_column = primary_key_column
Expand Down
63 changes: 54 additions & 9 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dlt.common.typing import TDataItem
from dlt.common.jsonpath import find_values, JSONPathFields, compile_path
from dlt.extract.incremental.exceptions import (
IncrementalCursorInvalidCoercion,
IncrementalCursorPathMissing,
IncrementalPrimaryKeyMissing,
)
Expand Down Expand Up @@ -158,14 +159,36 @@ def __call__(

# Check whether end_value has been reached
# Filter end value ranges exclusively, so in case of "max" function we remove values >= end_value
if self.end_value is not None and (
last_value_func((row_value, self.end_value)) != self.end_value
or last_value_func((row_value,)) == self.end_value
):
return None, False, True

if self.end_value is not None:
try:
if (
last_value_func((row_value, self.end_value)) != self.end_value
or last_value_func((row_value,)) == self.end_value
):
return None, False, True
except Exception as ex:
raise IncrementalCursorInvalidCoercion(
self.resource_name,
self.cursor_path,
self.end_value,
"end_value",
row_value,
type(row_value).__name__,
str(ex),
) from ex
check_values = (row_value,) + ((last_value,) if last_value is not None else ())
new_value = last_value_func(check_values)
try:
new_value = last_value_func(check_values)
except Exception as ex:
raise IncrementalCursorInvalidCoercion(
self.resource_name,
self.cursor_path,
last_value,
"start_value/initial_value",
row_value,
type(row_value).__name__,
str(ex),
) from ex
# new_value is "less" or equal to last_value (the actual max)
if last_value == new_value:
# use func to compute row_value into last_value compatible
Expand Down Expand Up @@ -294,14 +317,36 @@ def __call__(

# If end_value is provided, filter to include table rows that are "less" than end_value
if self.end_value is not None:
end_value_scalar = to_arrow_scalar(self.end_value, cursor_data_type)
try:
end_value_scalar = to_arrow_scalar(self.end_value, cursor_data_type)
except Exception as ex:
raise IncrementalCursorInvalidCoercion(
self.resource_name,
cursor_path,
self.end_value,
"end_value",
"<arrow column>",
cursor_data_type,
str(ex),
) from ex
tbl = tbl.filter(end_compare(tbl[cursor_path], end_value_scalar))
# Is max row value higher than end value?
# NOTE: pyarrow bool *always* evaluates to python True. `as_py()` is necessary
end_out_of_range = not end_compare(row_value_scalar, end_value_scalar).as_py()

if self.start_value is not None:
start_value_scalar = to_arrow_scalar(self.start_value, cursor_data_type)
try:
start_value_scalar = to_arrow_scalar(self.start_value, cursor_data_type)
except Exception as ex:
raise IncrementalCursorInvalidCoercion(
self.resource_name,
cursor_path,
self.start_value,
"start_value/initial_value",
"<arrow column>",
cursor_data_type,
str(ex),
) from ex
# Remove rows lower or equal than the last start value
keep_filter = last_value_compare(tbl[cursor_path], start_value_scalar)
start_out_of_range = bool(pa.compute.any(pa.compute.invert(keep_filter)).as_py())
Expand Down
21 changes: 20 additions & 1 deletion tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from dlt.sources.helpers.transform import take_first
from dlt.extract.incremental import IncrementalResourceWrapper, Incremental
from dlt.extract.incremental.exceptions import (
IncrementalCursorInvalidCoercion,
IncrementalCursorPathMissing,
IncrementalPrimaryKeyMissing,
)
Expand Down Expand Up @@ -1303,7 +1304,7 @@ def some_data(
)
# will cause invalid comparison
if item_type == "object":
with pytest.raises(InvalidStepFunctionArguments):
with pytest.raises(IncrementalCursorInvalidCoercion):
list(resource)
else:
data = data_item_to_list(item_type, list(resource))
Expand Down Expand Up @@ -2065,3 +2066,21 @@ def test_source():
incremental_steps = test_source_incremental().table_name._pipe._steps
assert isinstance(incremental_steps[-2], ValidateItem)
assert isinstance(incremental_steps[-1], IncrementalResourceWrapper)


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_cursor_date_coercion(item_type: TestDataItemFormat) -> None:
today = datetime.today().date()

@dlt.resource()
def updated_is_int(updated_at=dlt.sources.incremental("updated_at", initial_value=today)):
data = [{"updated_at": d} for d in [1, 2, 3]]
yield data_to_item_format(item_type, data)

pip_1_name = "test_pydantic_columns_validator_" + uniq_id()
pipeline = dlt.pipeline(pipeline_name=pip_1_name, destination="duckdb")

with pytest.raises(PipelineStepFailed) as pip_ex:
pipeline.run(updated_is_int())
assert isinstance(pip_ex.value.__cause__, IncrementalCursorInvalidCoercion)
assert pip_ex.value.__cause__.cursor_path == "updated_at"
Loading