Skip to content

Commit

Permalink
fix limiting bug and update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Dec 12, 2024
1 parent c662da1 commit 3738c29
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 6 deletions.
16 changes: 12 additions & 4 deletions dlt/extract/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,25 @@ def bind(self, pipe: SupportsPipe) -> "LimitItem":
return self

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
# detect when the limit is reached, time or yield count
if (self.count == self.max_items) or (
self.max_time and time.time() - self.start_time > self.max_time
self.count += 1

# detect when the limit is reached, max time or yield count
if (
(self.count == self.max_items)
or (self.max_time and time.time() - self.start_time > self.max_time)
or self.max_items == 0
):
self.exhausted = True
if inspect.isgenerator(self.gen):
self.gen.close()

# if max items is not 0, we return the last item
# otherwise never return anything
if self.max_items != 0:
return item

# do not return any late arriving items
if self.exhausted:
return None
self.count += 1

return item
17 changes: 16 additions & 1 deletion docs/website/docs/general-usage/resource.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,26 @@ dlt.pipeline(destination="duckdb").run(my_resource().add_limit(10))
The code above will extract `15*10=150` records. This is happening because in each iteration, 15 records are yielded, and we're limiting the number of iterations to 10.
:::

Some constraints of `add_limit` include:
Altenatively you can also apply a time limit to the resource. The code below will run the extraction for 10 seconds and extract how ever many items are yielded in that time. In combination with incrementals, this can be useful for batched loading or for loading on machines that have a run time limit.

```py
dlt.pipeline(destination="duckdb").run(my_resource().add_limit(max_time=10))
```

You can also apply a combination of both limits. In this case the extraction will stop as soon as either limit is reached.

```py
dlt.pipeline(destination="duckdb").run(my_resource().add_limit(max_items=10, max_time=10))
```


Some notes about the `add_limit`:

1. `add_limit` does not skip any items. It closes the iterator/generator that produces data after the limit is reached.
2. You cannot limit transformers. They should process all the data they receive fully to avoid inconsistencies in generated datasets.
3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic.
4. Calling add limit on a resource will replace any previously set limits settings.
5. For time-limited resources, the timer starts when the first item is processed. When resources are processed sequentially (FIFO mode), each resource's time limit applies also sequentially. In the default round robin mode, the time limits will usually run concurrently.

:::tip
If you are parameterizing the value of `add_limit` and sometimes need it to be disabled, you can set `None` or `-1` to disable the limiting.
Expand Down
14 changes: 13 additions & 1 deletion docs/website/docs/general-usage/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,20 @@ load_info = pipeline.run(pipedrive_source().add_limit(10))
print(load_info)
```

You can also apply a time limit to the source:

```py
pipeline.run(pipedrive_source().add_limit(max_time=10))
```

Or limit by both, the limit that is reached first will stop the extraction:

```py
pipeline.run(pipedrive_source().add_limit(max_items=10, max_time=10))
```

:::note
Note that `add_limit` **does not limit the number of records** but rather the "number of yields". `dlt` will close the iterator/generator that produces data after the limit is reached.
Note that `add_limit` **does not limit the number of records** but rather the "number of yields". `dlt` will close the iterator/generator that produces data after the limit is reached. Please read in more detail about the `add_limit` on the resource page.
:::

Find more on sampling data [here](resource.md#sample-from-large-data).
Expand Down

0 comments on commit 3738c29

Please sign in to comment.