diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 5f4f420543..3090cf744b 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -875,6 +875,70 @@ Consider the example below for reading incremental loading parameters from "conf ``` `id_after` incrementally stores the latest `cursor_path` value for future pipeline runs. +### Loading NULL values in the incremental cursor field + +When loading incrementally with a cursor field, each row is expected to contain a value at the cursor field that is not `None`. +For example, the following source data will raise an error: +```py +data = [ + {"id": 2, "updated_at": 2}, + {"id": 1, "updated_at": 1}, + {"id": 3, "updated_at": None}, +] +``` + +If you want to load data that includes `None` values there are two options: + +1. Transform the values at the incremental cursor to a value different from `None` before the incremental object is called. [See docs below](#transform-records-before-incremental-processing) +2. Configure the incremental load to tolerate `None` values using `incremental(..., on_cursor_value_none="include")`. + +Example: +```py +data = [ + {"id": 2, "updated_at": 2}, + {"id": 1, "updated_at": 1}, + {"id": 3, "updated_at": None}, +] + +@dlt.resource +def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_none="include")): + yield source_items + +result = list(some_data()) +assert len(result) == 3 +assert result[2]["updated_at"] == None +``` + +### Transform records before incremental processing +You can add steps to the pipeline that [filter, transform, or pivot your data](../general-usage/resource.md#filter-transform-and-pivot-data). +It is important to use the `insert_at` parameter to control the order of the execution and ensure that your custom steps are executed before the incremental processing starts. +In the following example, the step of data yielding is at `index = 0`, the custom transformation at `index = 1`, and the incremental processing at `index = 2`. + +Example: +```py +data = [ + {"id": 1, "created_at": 1, "updated_at": 1}, + {"id": 2, "created_at": 2, "updated_at": 2}, + {"id": 3, "created_at": 4, "updated_at": None}, +] + +@dlt.resource +def some_data(updated_at=dlt.sources.incremental("updated_at")): + yield data + +def set_default_updated_at(record): + if record.get("updated_at") is None: + record["updated_at"] = record.get("created_at") + return record + +result = list(some_data().add_map(set_default_updated_at, insert_at=1)) +assert len(result) == 3 +assert result[2]["updated_at"] == 4 +``` + +Similarly, with `add_filter(lambda r: r.get("updated_at") is not None, insert=1)` you can remove those records without a value at `updated_at` before the incremental processing starts. + + ## Doing a full refresh You may force a full refresh of a `merge` and `append` pipelines: