diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 05261ccb1b..62263a10b9 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -89,6 +89,7 @@ def make_remote_path(self) -> str: ) def make_remote_uri(self) -> str: + """Returns path on a remote filesystem as a full uri including scheme.""" return self._job_client.make_remote_uri(self.make_remote_path()) def metrics(self) -> Optional[LoadJobMetrics]: diff --git a/dlt/extract/incremental/exceptions.py b/dlt/extract/incremental/exceptions.py index e318a028dc..a5f94c2974 100644 --- a/dlt/extract/incremental/exceptions.py +++ b/dlt/extract/incremental/exceptions.py @@ -1,3 +1,5 @@ +from typing import Any + from dlt.extract.exceptions import PipeException from dlt.common.typing import TDataItem @@ -13,6 +15,30 @@ def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = N super().__init__(pipe_name, msg) +class IncrementalCursorInvalidCoercion(PipeException): + def __init__( + self, + pipe_name: str, + cursor_path: str, + cursor_value: TDataItem, + cursor_value_type: str, + item: TDataItem, + item_type: Any, + details: str, + ) -> None: + self.cursor_path = cursor_path + self.cursor_value = cursor_value + self.cursor_value_type = cursor_value_type + self.item = item + msg = ( + f"Could not coerce {cursor_value_type} with value {cursor_value} and type" + f" {type(cursor_value)} to actual data item {item} at path {cursor_path} with type" + f" {item_type}: {details}. You need to use different data type for" + f" {cursor_value_type} or cast your data ie. by using `add_map` on this resource." + ) + super().__init__(pipe_name, msg) + + class IncrementalPrimaryKeyMissing(PipeException): def __init__(self, pipe_name: str, primary_key_column: str, item: TDataItem) -> None: self.primary_key_column = primary_key_column diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 947e21f7b8..0ac9fdf520 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -8,6 +8,7 @@ from dlt.common.typing import TDataItem from dlt.common.jsonpath import find_values, JSONPathFields, compile_path from dlt.extract.incremental.exceptions import ( + IncrementalCursorInvalidCoercion, IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, ) @@ -158,14 +159,36 @@ def __call__( # Check whether end_value has been reached # Filter end value ranges exclusively, so in case of "max" function we remove values >= end_value - if self.end_value is not None and ( - last_value_func((row_value, self.end_value)) != self.end_value - or last_value_func((row_value,)) == self.end_value - ): - return None, False, True - + if self.end_value is not None: + try: + if ( + last_value_func((row_value, self.end_value)) != self.end_value + or last_value_func((row_value,)) == self.end_value + ): + return None, False, True + except Exception as ex: + raise IncrementalCursorInvalidCoercion( + self.resource_name, + self.cursor_path, + self.end_value, + "end_value", + row_value, + type(row_value).__name__, + str(ex), + ) from ex check_values = (row_value,) + ((last_value,) if last_value is not None else ()) - new_value = last_value_func(check_values) + try: + new_value = last_value_func(check_values) + except Exception as ex: + raise IncrementalCursorInvalidCoercion( + self.resource_name, + self.cursor_path, + last_value, + "start_value/initial_value", + row_value, + type(row_value).__name__, + str(ex), + ) from ex # new_value is "less" or equal to last_value (the actual max) if last_value == new_value: # use func to compute row_value into last_value compatible @@ -294,14 +317,36 @@ def __call__( # If end_value is provided, filter to include table rows that are "less" than end_value if self.end_value is not None: - end_value_scalar = to_arrow_scalar(self.end_value, cursor_data_type) + try: + end_value_scalar = to_arrow_scalar(self.end_value, cursor_data_type) + except Exception as ex: + raise IncrementalCursorInvalidCoercion( + self.resource_name, + cursor_path, + self.end_value, + "end_value", + "", + cursor_data_type, + str(ex), + ) from ex tbl = tbl.filter(end_compare(tbl[cursor_path], end_value_scalar)) # Is max row value higher than end value? # NOTE: pyarrow bool *always* evaluates to python True. `as_py()` is necessary end_out_of_range = not end_compare(row_value_scalar, end_value_scalar).as_py() if self.start_value is not None: - start_value_scalar = to_arrow_scalar(self.start_value, cursor_data_type) + try: + start_value_scalar = to_arrow_scalar(self.start_value, cursor_data_type) + except Exception as ex: + raise IncrementalCursorInvalidCoercion( + self.resource_name, + cursor_path, + self.start_value, + "start_value/initial_value", + "", + cursor_data_type, + str(ex), + ) from ex # Remove rows lower or equal than the last start value keep_filter = last_value_compare(tbl[cursor_path], start_value_scalar) start_out_of_range = bool(pa.compute.any(pa.compute.invert(keep_filter)).as_py()) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index f4082a7d86..c401552fb2 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -30,6 +30,7 @@ from dlt.sources.helpers.transform import take_first from dlt.extract.incremental import IncrementalResourceWrapper, Incremental from dlt.extract.incremental.exceptions import ( + IncrementalCursorInvalidCoercion, IncrementalCursorPathMissing, IncrementalPrimaryKeyMissing, ) @@ -1303,7 +1304,7 @@ def some_data( ) # will cause invalid comparison if item_type == "object": - with pytest.raises(InvalidStepFunctionArguments): + with pytest.raises(IncrementalCursorInvalidCoercion): list(resource) else: data = data_item_to_list(item_type, list(resource)) @@ -2065,3 +2066,21 @@ def test_source(): incremental_steps = test_source_incremental().table_name._pipe._steps assert isinstance(incremental_steps[-2], ValidateItem) assert isinstance(incremental_steps[-1], IncrementalResourceWrapper) + + +@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) +def test_cursor_date_coercion(item_type: TestDataItemFormat) -> None: + today = datetime.today().date() + + @dlt.resource() + def updated_is_int(updated_at=dlt.sources.incremental("updated_at", initial_value=today)): + data = [{"updated_at": d} for d in [1, 2, 3]] + yield data_to_item_format(item_type, data) + + pip_1_name = "test_pydantic_columns_validator_" + uniq_id() + pipeline = dlt.pipeline(pipeline_name=pip_1_name, destination="duckdb") + + with pytest.raises(PipelineStepFailed) as pip_ex: + pipeline.run(updated_is_int()) + assert isinstance(pip_ex.value.__cause__, IncrementalCursorInvalidCoercion) + assert pip_ex.value.__cause__.cursor_path == "updated_at"