From 2539429565346741d589d393452c2bd98358e5fd Mon Sep 17 00:00:00 2001 From: dat-a-man <98139823+dat-a-man@users.noreply.github.com> Date: Mon, 25 Mar 2024 22:26:16 +0530 Subject: [PATCH] Docs: Document incremental definition via config.toml (#1118) * Documentation update for reading incremental parameters from configuration files * Updated * Updated * Update * Improved wordings and added a bit of explanation. * small edits --------- Co-authored-by: AstrakhantsevaAA --- .../docs/general-usage/incremental-loading.md | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index fe3bb8b61d..23b2218b46 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -298,13 +298,13 @@ We just yield all the events and `dlt` does the filtering (using `id` column dec Github returns events ordered from newest to oldest. So we declare the `rows_order` as **descending** to [stop requesting more pages once the incremental value is out of range](#declare-row-order-to-not-request-unnecessary-data). We stop requesting more data from the API after finding the first event with `created_at` earlier than `initial_value`. -:::note +:::note **Note on Incremental Cursor Behavior:** -When using incremental cursors for loading data, it's essential to understand how `dlt` handles records in relation to the cursor's +When using incremental cursors for loading data, it's essential to understand how `dlt` handles records in relation to the cursor's last value. By default, `dlt` will load only those records for which the incremental cursor value is higher than the last known value of the cursor. This means that any records with a cursor value lower than or equal to the last recorded value will be ignored during the loading process. -This behavior ensures efficiency by avoiding the reprocessing of records that have already been loaded, but it can lead to confusion if -there are expectations of loading older records that fall below the current cursor threshold. If your use case requires the inclusion of +This behavior ensures efficiency by avoiding the reprocessing of records that have already been loaded, but it can lead to confusion if +there are expectations of loading older records that fall below the current cursor threshold. If your use case requires the inclusion of such records, you can consider adjusting your data extraction logic, using a full refresh strategy where appropriate or using `last_value_func` as discussed in the subsquent section. ::: @@ -625,6 +625,35 @@ Before `dlt` starts executing incremental resources, it looks for `data_interval You can run DAGs manually but you must remember to specify the Airflow logical date of the run in the past (use Run with config option). For such run `dlt` will load all data from that past date until now. If you do not specify the past date, a run with a range (now, now) will happen yielding no data. +### Reading incremental loading parameters from configuration + +Consider the example below for reading incremental loading parameters from "config.toml". We create a `generate_incremental_records` resource that yields "id", "idAfter", and "name". This resource retrieves `cursor_path` and `initial_value` from "config.toml". + +1. In "config.toml", define the `cursor_path` and `initial_value` as: + ```toml + # Configuration snippet for an incremental resource + [pipeline_with_incremental.sources.id_after] + cursor_path = "idAfter" + initial_value = 10 + ``` + + `cursor_path` is assigned the value "idAfter" with an initial value of 10. + +1. Here's how the `generate_incremental_records` resource uses `cursor_path` defined in "config.toml": + ```py + @dlt.resource(table_name="incremental_records") + def generate_incremental_records(id_after: dlt.sources.incremental = dlt.config.value): + for i in range(150): + yield {"id": i, "idAfter": i, "name": "name-" + str(i)} + + pipeline = dlt.pipeline( + pipeline_name="pipeline_with_incremental", + destination="duckdb", + ) + + pipeline.run(generate_incremental_records) + ``` + `id_after` incrementally stores the latest `cursor_path` value for future pipeline runs. ## Doing a full refresh