Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert add_limit to pipe step based limiting #2131

Merged
merged 16 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions dlt/extract/items.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import inspect
import time

from abc import ABC, abstractmethod
from typing import (
Any,
Expand Down Expand Up @@ -238,3 +240,42 @@ class ValidateItem(ItemTransform[TDataItem]):
def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]:
self.table_name = pipe.name
return self


class LimitItem(ItemTransform[TDataItem]):
placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental
rudolfix marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, max_items: Optional[int], max_time: Optional[float]) -> None:
self.max_items = max_items if max_items is not None else -1
self.max_time = max_time

def bind(self, pipe: SupportsPipe) -> "LimitItem":
self.gen = pipe.gen
self.count = 0
self.exhausted = False
self.start_time = time.time()
return self

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
self.count += 1

# detect when the limit is reached, max time or yield count
if (
(self.count == self.max_items)
or (self.max_time and time.time() - self.start_time > self.max_time)
or self.max_items == 0
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
):
self.exhausted = True
if inspect.isgenerator(self.gen):
self.gen.close()

# if max items is not 0, we return the last item
# otherwise never return anything
if self.max_items != 0:
return item

# do not return any late arriving items
if self.exhausted:
return None

return item
23 changes: 22 additions & 1 deletion dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
wrap_compat_transformer,
wrap_resource_gen,
wrap_async_iterator,
wrap_iterator,
)


Expand Down Expand Up @@ -122,7 +123,23 @@ def steps(self) -> List[TPipeStep]:

def find(self, *step_type: AnyType) -> int:
"""Finds a step with object of type `step_type`"""
return next((i for i, v in enumerate(self._steps) if isinstance(v, step_type)), -1)
found = self.find_all(step_type)
return found[0] if found else -1

def find_all(self, *step_type: AnyType) -> List[int]:
"""Finds all steps with object of type `step_type`"""
return [i for i, v in enumerate(self._steps) if isinstance(v, step_type)]

def get_by_type(self, *step_type: AnyType) -> TPipeStep:
"""Gets first step found with object of type `step_type`"""
return next((v for v in self._steps if isinstance(v, step_type)), None)

def remove_by_type(self, *step_type: AnyType) -> int:
"""Deletes first step found with object of type `step_type`, returns previous index"""
step_index = self.find(*step_type)
if step_index >= 0:
self.remove_step(step_index)
return step_index

def __getitem__(self, i: int) -> TPipeStep:
return self._steps[i]
Expand Down Expand Up @@ -279,6 +296,10 @@ def evaluate_gen(self) -> None:
if isinstance(self.gen, AsyncIterator):
self.replace_gen(wrap_async_iterator(self.gen))

# we also wrap iterators to make them stoppable
if isinstance(self.gen, Iterator):
self.replace_gen(wrap_iterator(self.gen))

# evaluate transforms
for step_no, step in enumerate(self._steps):
# print(f"pipe {self.name} step no {step_no} step({step})")
Expand Down
15 changes: 10 additions & 5 deletions dlt/extract/pipe_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
)
from dlt.common.configuration.container import Container
from dlt.common.exceptions import PipelineException
from dlt.common.pipeline import unset_current_pipe_name, set_current_pipe_name
from dlt.common.pipeline import (
unset_current_pipe_name,
set_current_pipe_name,
get_current_pipe_name,
)
from dlt.common.utils import get_callable_name

from dlt.extract.exceptions import (
Expand All @@ -38,7 +42,7 @@
)
from dlt.extract.pipe import Pipe
from dlt.extract.items import DataItemWithMeta, PipeItem, ResolvablePipeItem, SourcePipeItem
from dlt.extract.utils import wrap_async_iterator
from dlt.extract.utils import wrap_async_iterator, wrap_iterator
from dlt.extract.concurrency import FuturesPool

TPipeNextItemMode = Literal["fifo", "round_robin"]
Expand Down Expand Up @@ -179,10 +183,12 @@ def __next__(self) -> PipeItem:

item = pipe_item.item
# if item is iterator, then add it as a new source
# we wrap it to make it stoppable
if isinstance(item, Iterator):
# print(f"adding iterable {item}")
self._sources.append(
SourcePipeItem(item, pipe_item.step, pipe_item.pipe, pipe_item.meta)
SourcePipeItem(
wrap_iterator(item), pipe_item.step, pipe_item.pipe, pipe_item.meta
)
)
pipe_item = None
continue
Expand Down Expand Up @@ -291,7 +297,6 @@ def _get_source_item(self) -> ResolvablePipeItem:
first_evaluated_index = self._current_source_index
# always go round robin if None was returned or item is to be run as future
self._current_source_index = (self._current_source_index - 1) % sources_count

except StopIteration:
# remove empty iterator and try another source
self._sources.pop(self._current_source_index)
Expand Down
103 changes: 30 additions & 73 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from functools import partial
from typing import (
AsyncIterable,
AsyncIterator,
cast,
ClassVar,
Callable,
Iterable,
Expand Down Expand Up @@ -41,6 +41,7 @@
MapItem,
YieldMapItem,
ValidateItem,
LimitItem,
)
from dlt.extract.pipe_iterator import ManagedPipeIterator
from dlt.extract.pipe import Pipe, TPipeStep
Expand Down Expand Up @@ -214,29 +215,22 @@ def requires_args(self) -> bool:
return True

@property
def incremental(self) -> IncrementalResourceWrapper:
def incremental(self) -> Optional[IncrementalResourceWrapper]:
"""Gets incremental transform if it is in the pipe"""
incremental: IncrementalResourceWrapper = None
step_no = self._pipe.find(IncrementalResourceWrapper, Incremental)
if step_no >= 0:
incremental = self._pipe.steps[step_no] # type: ignore
return incremental
return cast(
Optional[IncrementalResourceWrapper],
self._pipe.get_by_type(IncrementalResourceWrapper, Incremental),
)

@property
def validator(self) -> Optional[ValidateItem]:
"""Gets validator transform if it is in the pipe"""
validator: ValidateItem = None
step_no = self._pipe.find(ValidateItem)
if step_no >= 0:
validator = self._pipe.steps[step_no] # type: ignore[assignment]
return validator
return cast(Optional[ValidateItem], self._pipe.get_by_type(ValidateItem))

@validator.setter
def validator(self, validator: Optional[ValidateItem]) -> None:
"""Add/remove or replace the validator in pipe"""
step_no = self._pipe.find(ValidateItem)
if step_no >= 0:
self._pipe.remove_step(step_no)
step_no = self._pipe.remove_by_type(ValidateItem)
if validator:
self.add_step(validator, insert_at=step_no if step_no >= 0 else None)

Expand Down Expand Up @@ -347,72 +341,37 @@ def add_filter(
self._pipe.insert_step(FilterItem(item_filter), insert_at)
return self

def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # noqa: A003
def add_limit(
self: TDltResourceImpl,
max_items: Optional[int] = None,
max_time: Optional[float] = None,
) -> TDltResourceImpl: # noqa: A003
"""Adds a limit `max_items` to the resource pipe.

This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging.
This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging.

Notes:
1. Transformers won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets.
2. Each yielded item may contain several records. `add_limit` only limits the "number of yields", not the total number of records.
3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic.
Notes:
1. Transformers won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets.
2. Each yielded item may contain several records. `add_limit` only limits the "number of yields", not the total number of records.
3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic.

Args:
max_items (int): The maximum number of items to yield
Returns:
"DltResource": returns self
max_items (int): The maximum number of items to yield, set to None for no limit
max_time (float): The maximum number of seconds for this generator to run after it was opened, set to None for no limit
Returns:
"DltResource": returns self
"""

# make sure max_items is a number, to allow "None" as value for unlimited
if max_items is None:
max_items = -1

def _gen_wrap(gen: TPipeStep) -> TPipeStep:
"""Wrap a generator to take the first `max_items` records"""

# zero items should produce empty generator
if max_items == 0:
return

count = 0
is_async_gen = False
if callable(gen):
gen = gen() # type: ignore

# wrap async gen already here
if isinstance(gen, AsyncIterator):
gen = wrap_async_iterator(gen)
is_async_gen = True

try:
for i in gen: # type: ignore # TODO: help me fix this later
yield i
if i is not None:
count += 1
# async gen yields awaitable so we must count one awaitable more
# so the previous one is evaluated and yielded.
# new awaitable will be cancelled
if count == max_items + int(is_async_gen):
return
finally:
if inspect.isgenerator(gen):
gen.close()
return

# transformers should be limited by their input, so we only limit non-transformers
if not self.is_transformer:
gen = self._pipe.gen
# wrap gen directly
if inspect.isgenerator(gen):
self._pipe.replace_gen(_gen_wrap(gen))
else:
# keep function as function to not evaluate generators before pipe starts
self._pipe.replace_gen(partial(_gen_wrap, gen))
else:
if self.is_transformer:
logger.warning(
f"Setting add_limit to a transformer {self.name} has no effect. Set the limit on"
" the top level resource."
)
else:
# remove existing limit if any
self._pipe.remove_by_type(LimitItem)
self.add_step(LimitItem(max_items=max_items, max_time=max_time))

return self

def parallelize(self: TDltResourceImpl) -> TDltResourceImpl:
Expand Down Expand Up @@ -445,9 +404,7 @@ def add_step(
return self

def _remove_incremental_step(self) -> None:
step_no = self._pipe.find(Incremental, IncrementalResourceWrapper)
if step_no >= 0:
self._pipe.remove_step(step_no)
self._pipe.remove_by_type(Incremental, IncrementalResourceWrapper)

def set_incremental(
self,
Expand Down
11 changes: 11 additions & 0 deletions dlt/extract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in
return meta_arg


def wrap_iterator(gen: Iterator[TDataItems]) -> Iterator[TDataItems]:
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
"""Wraps an iterator into a generator"""
if inspect.isgenerator(gen):
return gen

def wrapped_gen() -> Iterator[TDataItems]:
yield from gen

return wrapped_gen()


def wrap_async_iterator(
gen: AsyncIterator[TDataItems],
) -> Generator[Awaitable[TDataItems], None, None]:
Expand Down
17 changes: 16 additions & 1 deletion docs/website/docs/general-usage/resource.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,26 @@ dlt.pipeline(destination="duckdb").run(my_resource().add_limit(10))
The code above will extract `15*10=150` records. This is happening because in each iteration, 15 records are yielded, and we're limiting the number of iterations to 10.
:::

Some constraints of `add_limit` include:
Altenatively you can also apply a time limit to the resource. The code below will run the extraction for 10 seconds and extract how ever many items are yielded in that time. In combination with incrementals, this can be useful for batched loading or for loading on machines that have a run time limit.

```py
dlt.pipeline(destination="duckdb").run(my_resource().add_limit(max_time=10))
```

You can also apply a combination of both limits. In this case the extraction will stop as soon as either limit is reached.

```py
dlt.pipeline(destination="duckdb").run(my_resource().add_limit(max_items=10, max_time=10))
```


Some notes about the `add_limit`:

1. `add_limit` does not skip any items. It closes the iterator/generator that produces data after the limit is reached.
2. You cannot limit transformers. They should process all the data they receive fully to avoid inconsistencies in generated datasets.
3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic.
4. Calling add limit on a resource will replace any previously set limits settings.
5. For time-limited resources, the timer starts when the first item is processed. When resources are processed sequentially (FIFO mode), each resource's time limit applies also sequentially. In the default round robin mode, the time limits will usually run concurrently.

:::tip
If you are parameterizing the value of `add_limit` and sometimes need it to be disabled, you can set `None` or `-1` to disable the limiting.
Expand Down
14 changes: 13 additions & 1 deletion docs/website/docs/general-usage/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,20 @@ load_info = pipeline.run(pipedrive_source().add_limit(10))
print(load_info)
```

You can also apply a time limit to the source:

```py
pipeline.run(pipedrive_source().add_limit(max_time=10))
```

Or limit by both, the limit that is reached first will stop the extraction:

```py
pipeline.run(pipedrive_source().add_limit(max_items=10, max_time=10))
```

:::note
Note that `add_limit` **does not limit the number of records** but rather the "number of yields". `dlt` will close the iterator/generator that produces data after the limit is reached.
Note that `add_limit` **does not limit the number of records** but rather the "number of yields". `dlt` will close the iterator/generator that produces data after the limit is reached. Please read in more detail about the `add_limit` on the resource page.
:::

Find more on sampling data [here](resource.md#sample-from-large-data).
Expand Down
Loading
Loading