Skip to content

Commit

Permalink
implements ability to raise on cursor_path = None or to include the r…
Browse files Browse the repository at this point in the history
…ow. Fails for pandas & pyarrow

implements handling of cursor_path value is None for nested JSON paths
  • Loading branch information
willi-mueller committed Jul 22, 2024
1 parent 48149b6 commit b84c6e3
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 56 deletions.
14 changes: 12 additions & 2 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@
IncrementalCursorPathMissing,
IncrementalPrimaryKeyMissing,
)
from dlt.extract.incremental.typing import IncrementalColumnState, TCursorValue, LastValueFunc
from dlt.extract.incremental.typing import (
IncrementalColumnState,
TCursorValue,
LastValueFunc,
OnCursorValueNone,
)
from dlt.extract.pipe import Pipe
from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform
from dlt.extract.incremental.transform import (
Expand Down Expand Up @@ -81,7 +86,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
>>> info = p.run(r, destination="duckdb")
Args:
cursor_path: The name or a JSON path to an cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database.
cursor_path: The name or a JSON path to a cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database.
initial_value: Optional value used for `last_value` when no state is available, e.g. on the first run of the pipeline. If not provided `last_value` will be `None` on the first run.
last_value_func: Callable used to determine which cursor value to save in state. It is called with a list of the stored state value and all cursor vals from currently processing items. Default is `max`
primary_key: Optional primary key used to deduplicate data. If not provided, a primary key defined by the resource will be used. Pass a tuple to define a compound key. Pass empty tuple to disable unique checks
Expand All @@ -95,6 +100,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class.
The values passed explicitly to Incremental will be ignored.
Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded
on_cursor_value_none: Specify what happens when a record has `None` at the cursor_path: raise, include
"""

# this is config/dataclass so declare members
Expand All @@ -104,6 +110,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
end_value: Optional[Any] = None
row_order: Optional[TSortOrder] = None
allow_external_schedulers: bool = False
on_cursor_value_none: OnCursorValueNone = "raise"

# incremental acting as empty
EMPTY: ClassVar["Incremental[Any]"] = None
Expand All @@ -117,6 +124,7 @@ def __init__(
end_value: Optional[TCursorValue] = None,
row_order: Optional[TSortOrder] = None,
allow_external_schedulers: bool = False,
on_cursor_value_none: OnCursorValueNone = "raise",
) -> None:
# make sure that path is valid
if cursor_path:
Expand All @@ -132,6 +140,7 @@ def __init__(
self._primary_key: Optional[TTableHintTemplate[TColumnNames]] = primary_key
self.row_order = row_order
self.allow_external_schedulers = allow_external_schedulers
self.on_cursor_value_none = on_cursor_value_none

self._cached_state: IncrementalColumnState = None
"""State dictionary cached on first access"""
Expand Down Expand Up @@ -170,6 +179,7 @@ def _make_transforms(self) -> None:
self.last_value_func,
self._primary_key,
set(self._cached_state["unique_hashes"]),
self.on_cursor_value_none,
)

@classmethod
Expand Down
13 changes: 12 additions & 1 deletion dlt/extract/incremental/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,18 @@ def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = N
self.item = item
msg = (
msg
or f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document - if those are different from the names you see in database."
or f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document because they can be different from the names you see in database."
)
super().__init__(pipe_name, msg)


class IncrementalCursorPathHasValueNone(PipeException):
def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None:
self.json_path = json_path
self.item = item
msg = (
msg
or f"Cursor element with JSON path {json_path} has the value `None` in extracted data item. All data items must contain a value != None. Construct the incremental with on_cursor_value_none='include' if you want to include such rows"
)
super().__init__(pipe_name, msg)

Expand Down
26 changes: 20 additions & 6 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
from dlt.extract.incremental.exceptions import (
IncrementalCursorPathMissing,
IncrementalPrimaryKeyMissing,
IncrementalCursorPathHasValueNone,
)
from dlt.extract.incremental.typing import TCursorValue, LastValueFunc
from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueNone
from dlt.extract.utils import resolve_column_value
from dlt.extract.items import TTableHintTemplate
from dlt.common.schema.typing import TColumnNames
Expand Down Expand Up @@ -54,6 +55,7 @@ def __init__(
last_value_func: LastValueFunc[TCursorValue],
primary_key: Optional[TTableHintTemplate[TColumnNames]],
unique_hashes: Set[str],
on_cursor_value_none: OnCursorValueNone = "raise",
) -> None:
self.resource_name = resource_name
self.cursor_path = cursor_path
Expand All @@ -66,6 +68,7 @@ def __init__(
self.primary_key = primary_key
self.unique_hashes = unique_hashes
self.start_unique_hashes = set(unique_hashes)
self.on_cursor_value_none = on_cursor_value_none

# compile jsonpath
self._compiled_cursor_path = compile_path(cursor_path)
Expand Down Expand Up @@ -120,14 +123,25 @@ def find_cursor_value(self, row: TDataItem) -> Any:
if self.cursor_path not in row.keys() and not self._compiled_cursor_path:
raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row)

row_value = self._value_at_cursor_path(row)

if self.on_cursor_value_none == "raise" and row_value is None:
raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row)

return row_value

def _value_at_cursor_path(self, row: TDataItem) -> Any:
row_value = row.get(self.cursor_path, None)

if self._compiled_cursor_path:
row_values = find_values(self._compiled_cursor_path, row)
if not row_values:
cursor_values = find_values(self._compiled_cursor_path, row)
if cursor_values == [] and self.on_cursor_value_none == "raise":
raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row)
row_value = row_values[0]

elif None in cursor_values and self.on_cursor_value_none == "raise":
raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path, row)
else:
# ignores the other found values, e.g. when the path is $data.items[*].created_at
row_value = cursor_values[0]
return row_value

def __call__(
Expand All @@ -136,7 +150,7 @@ def __call__(
) -> Tuple[Optional[TDataItem], bool, bool]:
"""
Returns:
Tuple (row, start_out_of_range, end_out_of_range) where row is either the data item or `None` if it is completely filtered out
Tuple (row, is_below_initial_value, is_above_end_value) where row is either the data item or `None` if it is completely filtered out
"""
if row is None:
return row, False, False
Expand Down
3 changes: 2 additions & 1 deletion dlt/extract/incremental/typing.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import TypedDict, Optional, Any, List, TypeVar, Callable, Sequence
from typing import TypedDict, Optional, Any, List, Literal, TypeVar, Callable, Sequence


TCursorValue = TypeVar("TCursorValue", bound=Any)
LastValueFunc = Callable[[Sequence[TCursorValue]], Any]
OnCursorValueNone = Literal["raise", "include"]


class IncrementalColumnState(TypedDict):
Expand Down
6 changes: 3 additions & 3 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ user's profile Stateless data cannot change - for example, a recorded event, suc

Because stateless data does not need to be updated, we can just append it.

For stateful data, comes a second question - Can I extract it incrementally from the source? If yes, you should use [slowly changing dimensions (Type-2)](#scd2-strategy), which allow you to maintain historical records of data changes over time.
For stateful data, comes a second question - Can I extract it incrementally from the source? If yes, you should use [slowly changing dimensions (Type-2)](#scd2-strategy), which allow you to maintain historical records of data changes over time.

If not, then we need to replace the entire data set. If however we can request the data incrementally such
as "all users added or modified since yesterday" then we can simply apply changes to our existing
Expand Down Expand Up @@ -657,7 +657,7 @@ than `end_value`.

:::caution
In rare cases when you use Incremental with a transformer, `dlt` will not be able to automatically close
generator associated with a row that is out of range. You can still use still call `can_close()` method on
generator associated with a row that is out of range. You can still call the `can_close()` method on
incremental and exit yield loop when true.
:::

Expand Down Expand Up @@ -1077,4 +1077,4 @@ sources:
}
```

Verify that the `last_value` is updated between pipeline runs.
Verify that the `last_value` is updated between pipeline runs.
Loading

0 comments on commit b84c6e3

Please sign in to comment.