Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/1571 Incremental: Optionally load or ignore records with cursor_path missing or None value #1576

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bbfbb04
adds failing test
willi-mueller Jul 9, 2024
b2efc6e
incremental extract does not crash if one record has cursor_path = None
willi-mueller Jul 10, 2024
99327f9
implements ability to raise on cursor_path = None or to include the r…
willi-mueller Jul 11, 2024
bfda7de
renames start_out_of_range and end_out_of_range to is_below_initial_v…
willi-mueller Jul 12, 2024
c1e765f
test that add_map can be used to transform items before the increment…
willi-mueller Jul 22, 2024
147772e
Reverts renaming of start_out_of_range and end_out_of_range to is_bel…
willi-mueller Jul 31, 2024
36957fa
Documents handling NULL values at incremental cursor
willi-mueller Jul 31, 2024
bd46dcf
tests that even records with None at cursor path are loaded
willi-mueller Aug 1, 2024
a822499
tests with full pipeline run instead of only extraction with table me…
willi-mueller Aug 1, 2024
89118b0
Allows exclusion of None values for python Objects, not Pandas or Arrows
willi-mueller Aug 1, 2024
d09436b
refactors for speed and readability
willi-mueller Aug 1, 2024
e282c9f
shortens implementation
willi-mueller Aug 2, 2024
30e3f1f
suggestions from code review
willi-mueller Aug 2, 2024
45b7635
fixes exclusion of rows with None cursor values
willi-mueller Aug 2, 2024
00263ff
`include` includes also rows where cursor_path is missing
willi-mueller Aug 2, 2024
a516754
Clarifies that value at cursor_path and cursor_path not in records ar…
willi-mueller Aug 2, 2024
2eb2928
makes supplied cursor_path pop out in error messages
willi-mueller Aug 2, 2024
c3e63e8
code formatting & docs update
willi-mueller Aug 2, 2024
ab5e334
integrate docs for the four ways of incremental processing of missing…
willi-mueller Aug 5, 2024
2820756
arrow: supports exclusion of rows where cursor value is missing
willi-mueller Aug 5, 2024
d127e46
make test suite tighter
willi-mueller Aug 5, 2024
6d7d4c0
arrow-table: processes null rows and adds them back after processing.…
willi-mueller Aug 7, 2024
d4b8b68
supports include null cursor_path rows for RecordBatches
willi-mueller Aug 7, 2024
ca7b91c
ignores typing errors
willi-mueller Aug 7, 2024
d6fde19
fixes import error when arrow is not installed
willi-mueller Aug 8, 2024
99c1e39
code formatting
willi-mueller Aug 8, 2024
06aff12
dissatisfying headstand to pass tests/pipeline/test_pipeline_extra.py…
willi-mueller Aug 8, 2024
a9d1775
extracts low-level code to method
willi-mueller Aug 20, 2024
605461c
simplifies code a bit
willi-mueller Aug 20, 2024
0b267a2
adds failing test with overlapping incremental cursor values.
willi-mueller Aug 20, 2024
9786abd
drops temp arrow index before concat and any returns
rudolfix Aug 21, 2024
4a22f34
Merge branch 'devel' into fix/1571-tolerate-record-without-incrementa…
rudolfix Aug 30, 2024
4ce384a
handles non existing attr when gettin row_value, simplifies getter code
rudolfix Aug 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@
IncrementalCursorPathMissing,
IncrementalPrimaryKeyMissing,
)
from dlt.extract.incremental.typing import IncrementalColumnState, TCursorValue, LastValueFunc
from dlt.extract.incremental.typing import (
IncrementalColumnState,
TCursorValue,
LastValueFunc,
OnCursorValueMissing,
)
from dlt.extract.pipe import Pipe
from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform
from dlt.extract.incremental.transform import (
Expand Down Expand Up @@ -81,7 +86,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
>>> info = p.run(r, destination="duckdb")

Args:
cursor_path: The name or a JSON path to an cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database.
cursor_path: The name or a JSON path to a cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database.
initial_value: Optional value used for `last_value` when no state is available, e.g. on the first run of the pipeline. If not provided `last_value` will be `None` on the first run.
last_value_func: Callable used to determine which cursor value to save in state. It is called with a list of the stored state value and all cursor vals from currently processing items. Default is `max`
primary_key: Optional primary key used to deduplicate data. If not provided, a primary key defined by the resource will be used. Pass a tuple to define a compound key. Pass empty tuple to disable unique checks
Expand All @@ -95,6 +100,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class.
The values passed explicitly to Incremental will be ignored.
Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded
on_cursor_value_missing: Specify what happens when the cursor_path does not exist in a record or a record has `None` at the cursor_path: raise, include, exclude
"""

# this is config/dataclass so declare members
Expand All @@ -104,6 +110,7 @@ class Incremental(ItemTransform[TDataItem], BaseConfiguration, Generic[TCursorVa
end_value: Optional[Any] = None
row_order: Optional[TSortOrder] = None
allow_external_schedulers: bool = False
on_cursor_value_missing: OnCursorValueMissing = "raise"

# incremental acting as empty
EMPTY: ClassVar["Incremental[Any]"] = None
Expand All @@ -118,6 +125,7 @@ def __init__(
end_value: Optional[TCursorValue] = None,
row_order: Optional[TSortOrder] = None,
allow_external_schedulers: bool = False,
on_cursor_value_missing: OnCursorValueMissing = "raise",
) -> None:
# make sure that path is valid
if cursor_path:
Expand All @@ -133,6 +141,11 @@ def __init__(
self._primary_key: Optional[TTableHintTemplate[TColumnNames]] = primary_key
self.row_order = row_order
self.allow_external_schedulers = allow_external_schedulers
if on_cursor_value_missing not in ["raise", "include", "exclude"]:
raise ValueError(
f"Unexpected argument for on_cursor_value_missing. Got {on_cursor_value_missing}"
)
self.on_cursor_value_missing = on_cursor_value_missing

self._cached_state: IncrementalColumnState = None
"""State dictionary cached on first access"""
Expand Down Expand Up @@ -171,6 +184,7 @@ def _make_transforms(self) -> None:
self.last_value_func,
self._primary_key,
set(self._cached_state["unique_hashes"]),
self.on_cursor_value_missing,
)

@classmethod
Expand Down
19 changes: 17 additions & 2 deletions dlt/extract/incremental/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,27 @@


class IncrementalCursorPathMissing(PipeException):
def __init__(self, pipe_name: str, json_path: str, item: TDataItem, msg: str = None) -> None:
def __init__(
self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None
) -> None:
self.json_path = json_path
self.item = item
msg = (
msg
or f"Cursor element with JSON path `{json_path}` was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document because they can be different from the names you see in database."
)
super().__init__(pipe_name, msg)


class IncrementalCursorPathHasValueNone(PipeException):
def __init__(
self, pipe_name: str, json_path: str, item: TDataItem = None, msg: str = None
) -> None:
self.json_path = json_path
self.item = item
msg = (
msg
or f"Cursor element with JSON path {json_path} was not found in extracted data item. All data items must contain this path. Use the same names of fields as in your JSON document - if those are different from the names you see in database."
or f"Cursor element with JSON path `{json_path}` has the value `None` in extracted data item. All data items must contain a value != None. Construct the incremental with on_cursor_value_none='include' if you want to include such rows"
)
super().__init__(pipe_name, msg)

Expand Down
89 changes: 69 additions & 20 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, date # noqa: I251
from typing import Any, Optional, Set, Tuple, List
from datetime import datetime # noqa: I251
from typing import Any, Optional, Set, Tuple, List, Type

from dlt.common.exceptions import MissingDependencyException
from dlt.common.utils import digest128
Expand All @@ -11,8 +11,9 @@
IncrementalCursorInvalidCoercion,
IncrementalCursorPathMissing,
IncrementalPrimaryKeyMissing,
IncrementalCursorPathHasValueNone,
)
from dlt.extract.incremental.typing import TCursorValue, LastValueFunc
from dlt.extract.incremental.typing import TCursorValue, LastValueFunc, OnCursorValueMissing
from dlt.extract.utils import resolve_column_value
from dlt.extract.items import TTableHintTemplate
from dlt.common.schema.typing import TColumnNames
Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(
last_value_func: LastValueFunc[TCursorValue],
primary_key: Optional[TTableHintTemplate[TColumnNames]],
unique_hashes: Set[str],
on_cursor_value_missing: OnCursorValueMissing = "raise",
) -> None:
self.resource_name = resource_name
self.cursor_path = cursor_path
Expand All @@ -67,6 +69,7 @@ def __init__(
self.primary_key = primary_key
self.unique_hashes = unique_hashes
self.start_unique_hashes = set(unique_hashes)
self.on_cursor_value_missing = on_cursor_value_missing

# compile jsonpath
self._compiled_cursor_path = compile_path(cursor_path)
Expand Down Expand Up @@ -116,21 +119,39 @@ class JsonIncremental(IncrementalTransform):
def find_cursor_value(self, row: TDataItem) -> Any:
"""Finds value in row at cursor defined by self.cursor_path.

Will use compiled JSONPath if present, otherwise it reverts to column search if row is dict
Will use compiled JSONPath if present.
Otherwise, reverts to field access if row is dict, Pydantic model, or of other class.
"""
row_value: Any = None
key_exc: Type[Exception] = IncrementalCursorPathHasValueNone
if self._compiled_cursor_path:
row_values = find_values(self._compiled_cursor_path, row)
if row_values:
row_value = row_values[0]
# ignores the other found values, e.g. when the path is $data.items[*].created_at
try:
row_value = find_values(self._compiled_cursor_path, row)[0]
except IndexError:
# empty list so raise a proper exception
row_value = None
key_exc = IncrementalCursorPathMissing
else:
try:
row_value = row[self.cursor_path]
except Exception:
pass
if row_value is None:
raise IncrementalCursorPathMissing(self.resource_name, self.cursor_path, row)
return row_value
try:
row_value = row[self.cursor_path]
except TypeError:
# supports Pydantic models and other classes
row_value = getattr(row, self.cursor_path)
except (KeyError, AttributeError):
# attr not found so raise a proper exception
row_value = None
key_exc = IncrementalCursorPathMissing

# if we have a value - return it
if row_value is not None:
return row_value

if self.on_cursor_value_missing == "raise":
# raise missing path or None value exception
raise key_exc(self.resource_name, self.cursor_path, row)
elif self.on_cursor_value_missing == "exclude":
return None

def __call__(
self,
Expand All @@ -144,6 +165,12 @@ def __call__(
return row, False, False

row_value = self.find_cursor_value(row)
if row_value is None:
if self.on_cursor_value_missing == "exclude":
return None, False, False
else:
return row, False, False

last_value = self.last_value
last_value_func = self.last_value_func

Expand Down Expand Up @@ -299,6 +326,7 @@ def __call__(

# TODO: Json path support. For now assume the cursor_path is a column name
cursor_path = self.cursor_path

# The new max/min value
try:
# NOTE: datetimes are always pendulum in UTC
Expand All @@ -310,11 +338,16 @@ def __call__(
self.resource_name,
cursor_path,
tbl,
f"Column name {cursor_path} was not found in the arrow table. Not nested JSON paths"
f"Column name `{cursor_path}` was not found in the arrow table. Nested JSON paths"
" are not supported for arrow tables and dataframes, the incremental cursor_path"
" must be a column name.",
) from e

if tbl.schema.field(cursor_path).nullable:
tbl_without_null, tbl_with_null = self._process_null_at_cursor_path(tbl)

tbl = tbl_without_null

# If end_value is provided, filter to include table rows that are "less" than end_value
if self.end_value is not None:
try:
Expand Down Expand Up @@ -396,12 +429,28 @@ def __call__(
)
)

# drop the temp unique index before concat and returning
if "_dlt_index" in tbl.schema.names:
tbl = pyarrow.remove_columns(tbl, ["_dlt_index"])

if self.on_cursor_value_missing == "include":
if isinstance(tbl, pa.RecordBatch):
assert isinstance(tbl_with_null, pa.RecordBatch)
tbl = pa.Table.from_batches([tbl, tbl_with_null])
else:
tbl = pa.concat_tables([tbl, tbl_with_null])

if len(tbl) == 0:
return None, start_out_of_range, end_out_of_range
try:
tbl = pyarrow.remove_columns(tbl, ["_dlt_index"])
except KeyError:
pass
if is_pandas:
return tbl.to_pandas(), start_out_of_range, end_out_of_range
tbl = tbl.to_pandas()
return tbl, start_out_of_range, end_out_of_range

def _process_null_at_cursor_path(self, tbl: "pa.Table") -> Tuple["pa.Table", "pa.Table"]:
mask = pa.compute.is_valid(tbl[self.cursor_path])
rows_without_null = tbl.filter(mask)
rows_with_null = tbl.filter(pa.compute.invert(mask))
if self.on_cursor_value_missing == "raise":
if rows_with_null.num_rows > 0:
raise IncrementalCursorPathHasValueNone(self.resource_name, self.cursor_path)
return rows_without_null, rows_with_null
3 changes: 2 additions & 1 deletion dlt/extract/incremental/typing.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import TypedDict, Optional, Any, List, TypeVar, Callable, Sequence
from typing import TypedDict, Optional, Any, List, Literal, TypeVar, Callable, Sequence


TCursorValue = TypeVar("TCursorValue", bound=Any)
LastValueFunc = Callable[[Sequence[TCursorValue]], Any]
OnCursorValueMissing = Literal["raise", "include", "exclude"]


class IncrementalColumnState(TypedDict):
Expand Down
69 changes: 61 additions & 8 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ than `end_value`.

:::caution
In rare cases when you use Incremental with a transformer, `dlt` will not be able to automatically close
generator associated with a row that is out of range. You can still use still call `can_close()` method on
generator associated with a row that is out of range. You can still call the `can_close()` method on
incremental and exit yield loop when true.
:::

Expand Down Expand Up @@ -907,22 +907,75 @@ Consider the example below for reading incremental loading parameters from "conf
```
`id_after` incrementally stores the latest `cursor_path` value for future pipeline runs.

### Loading NULL values in the incremental cursor field
### Loading when incremental cursor path is missing or value is None/NULL

When loading incrementally with a cursor field, each row is expected to contain a value at the cursor field that is not `None`.
For example, the following source data will raise an error:
You can customize the incremental processing of dlt by setting the parameter `on_cursor_value_missing`.

When loading incrementally with the default settings, there are two assumptions:
1. each row contains the cursor path
2. each row is expected to contain a value at the cursor path that is not `None`.

For example, the two following source data will raise an error:
```py
@dlt.resource
def some_data(updated_at=dlt.sources.incremental("updated_at")):
def some_data_without_cursor_path(updated_at=dlt.sources.incremental("updated_at")):
yield [
{"id": 1, "created_at": 1, "updated_at": 1},
{"id": 2, "created_at": 2, "updated_at": 2},
{"id": 2, "created_at": 2}, # cursor field is missing
]

list(some_data_without_cursor_path())

@dlt.resource
def some_data_without_cursor_value(updated_at=dlt.sources.incremental("updated_at")):
yield [
{"id": 1, "created_at": 1, "updated_at": 1},
{"id": 3, "created_at": 4, "updated_at": None}, # value at cursor field is None
]

list(some_data_without_cursor_value())
```


To process a data set where some records do not include the incremental cursor path or where the values at the cursor path are `None,` there are the following four options:

1. Configure the incremental load to raise an exception in case there is a row where the cursor path is missing or has the value `None` using `incremental(..., on_cursor_value_missing="raise")`. This is the default behavior.
2. Configure the incremental load to tolerate the missing cursor path and `None` values using `incremental(..., on_cursor_value_missing="include")`.
3. Configure the incremental load to exclude the missing cursor path and `None` values using `incremental(..., on_cursor_value_missing="exclude")`.
4. Before the incremental processing begins: Ensure that the incremental field is present and transform the values at the incremental cursor to a value different from `None`. [See docs below](#transform-records-before-incremental-processing)

Here is an example of including rows where the incremental cursor value is missing or `None`:
```py
@dlt.resource
def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_missing="include")):
yield [
{"id": 1, "created_at": 1, "updated_at": 1},
{"id": 2, "created_at": 2},
{"id": 3, "created_at": 4, "updated_at": None},
]

result = list(some_data())
assert len(result) == 3
assert result[1] == {"id": 2, "created_at": 2}
assert result[2] == {"id": 3, "created_at": 4, "updated_at": None}
```

If you do not want to import records without the cursor path or where the value at the cursor path is `None` use the following incremental configuration:

```py
@dlt.resource
def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_missing="exclude")):
yield [
{"id": 1, "created_at": 1, "updated_at": 1},
{"id": 2, "created_at": 2},
{"id": 3, "created_at": 4, "updated_at": None},
]

list(some_data())
result = list(some_data())
assert len(result) == 1
```

### Transform records before incremental processing
If you want to load data that includes `None` values you can transform the records before the incremental processing.
You can add steps to the pipeline that [filter, transform, or pivot your data](../general-usage/resource.md#filter-transform-and-pivot-data).

Expand Down Expand Up @@ -1162,4 +1215,4 @@ sources:
}
```

Verify that the `last_value` is updated between pipeline runs.
Verify that the `last_value` is updated between pipeline runs.
Loading
Loading