Skip to content

Commit

Permalink
adds docs on handling NULL values at incremental cursor path
Browse files Browse the repository at this point in the history
  • Loading branch information
willi-mueller committed Jul 25, 2024
1 parent 196a0ff commit 2588d7f
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,70 @@ Consider the example below for reading incremental loading parameters from "conf
```
`id_after` incrementally stores the latest `cursor_path` value for future pipeline runs.

### Loading NULL values in the incremental cursor field

When loading incrementally with a cursor field, each row is expected to contain a value at the cursor field that is not `None`.
For example, the following source data will raise an error:
```py
data = [
{"id": 2, "updated_at": 2},
{"id": 1, "updated_at": 1},
{"id": 3, "updated_at": None},
]
```

If you want to load data that includes `None` values there are two options:

1. Transform the values at the incremental cursor to a value different from `None` before the incremental object is called. [See docs below](#transform-records-before-incremental-processing)
2. Configure the incremental load to tolerate `None` values using `incremental(..., on_cursor_value_none="include")`.

Example:
```py
data = [
{"id": 2, "updated_at": 2},
{"id": 1, "updated_at": 1},
{"id": 3, "updated_at": None},
]

@dlt.resource
def some_data(updated_at=dlt.sources.incremental("updated_at", on_cursor_value_none="include")):
yield source_items

result = list(some_data())
assert len(result) == 3
assert result[2]["updated_at"] == None
```

### Transform records before incremental processing
You can add steps to the pipeline that [filter, transform, or pivot your data](../general-usage/resource.md#filter-transform-and-pivot-data).
It is important to use the `insert_at` parameter to control the order of the execution and ensure that your custom steps are executed before the incremental processing starts.
In the following example, the step of data yielding is at `index = 0`, the custom transformation at `index = 1`, and the incremental processing at `index = 2`.

Example:
```py
data = [
{"id": 1, "created_at": 1, "updated_at": 1},
{"id": 2, "created_at": 2, "updated_at": 2},
{"id": 3, "created_at": 4, "updated_at": None},
]

@dlt.resource
def some_data(updated_at=dlt.sources.incremental("updated_at")):
yield data

def set_default_updated_at(record):
if record.get("updated_at") is None:
record["updated_at"] = record.get("created_at")
return record

result = list(some_data().add_map(set_default_updated_at, insert_at=1))
assert len(result) == 3
assert result[2]["updated_at"] == 4
```

Similarly, with `add_filter(lambda r: r.get("updated_at") is not None, insert=1)` you can remove those records without a value at `updated_at` before the incremental processing starts.


## Doing a full refresh

You may force a full refresh of a `merge` and `append` pipelines:
Expand Down

0 comments on commit 2588d7f

Please sign in to comment.