Skip to content

Commit

Permalink
Merge branch 'devel' into feat/1603-prevent-files-removal-on-staging
Browse files Browse the repository at this point in the history
  • Loading branch information
VioletM committed Aug 28, 2024
2 parents 6a2bdd2 + e337cca commit e65f853
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 10 deletions.
1 change: 1 addition & 0 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def make_remote_path(self) -> str:
)

def make_remote_uri(self) -> str:
"""Returns path on a remote filesystem as a full uri including scheme."""
return self._job_client.make_remote_uri(self.make_remote_path())

def metrics(self) -> Optional[LoadJobMetrics]:
Expand Down
26 changes: 26 additions & 0 deletions dlt/extract/incremental/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

from dlt.extract.exceptions import PipeException
from dlt.common.typing import TDataItem

Expand All @@ -13,6 +15,30 @@ def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = N
super().__init__(pipe_name, msg)


class IncrementalCursorInvalidCoercion(PipeException):
def __init__(
self,
pipe_name: str,
cursor_path: str,
cursor_value: TDataItem,
cursor_value_type: str,
item: TDataItem,
item_type: Any,
details: str,
) -> None:
self.cursor_path = cursor_path
self.cursor_value = cursor_value
self.cursor_value_type = cursor_value_type
self.item = item
msg = (
f"Could not coerce {cursor_value_type} with value {cursor_value} and type"
f" {type(cursor_value)} to actual data item {item} at path {cursor_path} with type"
f" {item_type}: {details}. You need to use different data type for"
f" {cursor_value_type} or cast your data ie. by using `add_map` on this resource."
)
super().__init__(pipe_name, msg)


class IncrementalPrimaryKeyMissing(PipeException):
def __init__(self, pipe_name: str, primary_key_column: str, item: TDataItem) -> None:
self.primary_key_column = primary_key_column
Expand Down
63 changes: 54 additions & 9 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dlt.common.typing import TDataItem
from dlt.common.jsonpath import find_values, JSONPathFields, compile_path
from dlt.extract.incremental.exceptions import (
IncrementalCursorInvalidCoercion,
IncrementalCursorPathMissing,
IncrementalPrimaryKeyMissing,
)
Expand Down Expand Up @@ -158,14 +159,36 @@ def __call__(

# Check whether end_value has been reached
# Filter end value ranges exclusively, so in case of "max" function we remove values >= end_value
if self.end_value is not None and (
last_value_func((row_value, self.end_value)) != self.end_value
or last_value_func((row_value,)) == self.end_value
):
return None, False, True

if self.end_value is not None:
try:
if (
last_value_func((row_value, self.end_value)) != self.end_value
or last_value_func((row_value,)) == self.end_value
):
return None, False, True
except Exception as ex:
raise IncrementalCursorInvalidCoercion(
self.resource_name,
self.cursor_path,
self.end_value,
"end_value",
row_value,
type(row_value).__name__,
str(ex),
) from ex
check_values = (row_value,) + ((last_value,) if last_value is not None else ())
new_value = last_value_func(check_values)
try:
new_value = last_value_func(check_values)
except Exception as ex:
raise IncrementalCursorInvalidCoercion(
self.resource_name,
self.cursor_path,
last_value,
"start_value/initial_value",
row_value,
type(row_value).__name__,
str(ex),
) from ex
# new_value is "less" or equal to last_value (the actual max)
if last_value == new_value:
# use func to compute row_value into last_value compatible
Expand Down Expand Up @@ -294,14 +317,36 @@ def __call__(

# If end_value is provided, filter to include table rows that are "less" than end_value
if self.end_value is not None:
end_value_scalar = to_arrow_scalar(self.end_value, cursor_data_type)
try:
end_value_scalar = to_arrow_scalar(self.end_value, cursor_data_type)
except Exception as ex:
raise IncrementalCursorInvalidCoercion(
self.resource_name,
cursor_path,
self.end_value,
"end_value",
"<arrow column>",
cursor_data_type,
str(ex),
) from ex
tbl = tbl.filter(end_compare(tbl[cursor_path], end_value_scalar))
# Is max row value higher than end value?
# NOTE: pyarrow bool *always* evaluates to python True. `as_py()` is necessary
end_out_of_range = not end_compare(row_value_scalar, end_value_scalar).as_py()

if self.start_value is not None:
start_value_scalar = to_arrow_scalar(self.start_value, cursor_data_type)
try:
start_value_scalar = to_arrow_scalar(self.start_value, cursor_data_type)
except Exception as ex:
raise IncrementalCursorInvalidCoercion(
self.resource_name,
cursor_path,
self.start_value,
"start_value/initial_value",
"<arrow column>",
cursor_data_type,
str(ex),
) from ex
# Remove rows lower or equal than the last start value
keep_filter = last_value_compare(tbl[cursor_path], start_value_scalar)
start_out_of_range = bool(pa.compute.any(pa.compute.invert(keep_filter)).as_py())
Expand Down
21 changes: 20 additions & 1 deletion tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from dlt.sources.helpers.transform import take_first
from dlt.extract.incremental import IncrementalResourceWrapper, Incremental
from dlt.extract.incremental.exceptions import (
IncrementalCursorInvalidCoercion,
IncrementalCursorPathMissing,
IncrementalPrimaryKeyMissing,
)
Expand Down Expand Up @@ -1303,7 +1304,7 @@ def some_data(
)
# will cause invalid comparison
if item_type == "object":
with pytest.raises(InvalidStepFunctionArguments):
with pytest.raises(IncrementalCursorInvalidCoercion):
list(resource)
else:
data = data_item_to_list(item_type, list(resource))
Expand Down Expand Up @@ -2065,3 +2066,21 @@ def test_source():
incremental_steps = test_source_incremental().table_name._pipe._steps
assert isinstance(incremental_steps[-2], ValidateItem)
assert isinstance(incremental_steps[-1], IncrementalResourceWrapper)


@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_cursor_date_coercion(item_type: TestDataItemFormat) -> None:
today = datetime.today().date()

@dlt.resource()
def updated_is_int(updated_at=dlt.sources.incremental("updated_at", initial_value=today)):
data = [{"updated_at": d} for d in [1, 2, 3]]
yield data_to_item_format(item_type, data)

pip_1_name = "test_pydantic_columns_validator_" + uniq_id()
pipeline = dlt.pipeline(pipeline_name=pip_1_name, destination="duckdb")

with pytest.raises(PipelineStepFailed) as pip_ex:
pipeline.run(updated_is_int())
assert isinstance(pip_ex.value.__cause__, IncrementalCursorInvalidCoercion)
assert pip_ex.value.__cause__.cursor_path == "updated_at"

0 comments on commit e65f853

Please sign in to comment.