From fe174f851d2db29f0c3675a57175638dfeafa3f9 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 10 Dec 2024 16:45:26 +0100 Subject: [PATCH 01/16] convert add_limit to step based limiting --- dlt/extract/items.py | 20 ++++++++++++++ dlt/extract/pipe.py | 5 ++++ dlt/extract/pipe_iterator.py | 15 +++++++---- dlt/extract/resource.py | 51 ++++-------------------------------- dlt/extract/utils.py | 11 ++++++++ 5 files changed, 51 insertions(+), 51 deletions(-) diff --git a/dlt/extract/items.py b/dlt/extract/items.py index 888787e6b7..21738abcef 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -238,3 +238,23 @@ class ValidateItem(ItemTransform[TDataItem]): def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: self.table_name = pipe.name return self + + +class LimitItem(ItemTransform[TDataItem]): + placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental + + def __init__(self, max_items: int) -> None: + self.max_items = max_items if max_items is not None else -1 + + def bind(self, pipe: SupportsPipe) -> "LimitItem": + self.gen = pipe.gen + self.count = 0 + return self + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if self.count == self.max_items: + if inspect.isgenerator(self.gen): + self.gen.close() + return None + self.count += 1 + return item diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 02b52c4623..0e24de9558 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -39,6 +39,7 @@ wrap_compat_transformer, wrap_resource_gen, wrap_async_iterator, + wrap_iterator, ) @@ -279,6 +280,10 @@ def evaluate_gen(self) -> None: if isinstance(self.gen, AsyncIterator): self.replace_gen(wrap_async_iterator(self.gen)) + # we also wrap iterators to make them stoppable + if isinstance(self.gen, Iterator): + self.replace_gen(wrap_iterator(self.gen)) + # evaluate transforms for step_no, step in enumerate(self._steps): # print(f"pipe {self.name} step no {step_no} step({step})") diff --git a/dlt/extract/pipe_iterator.py b/dlt/extract/pipe_iterator.py index 465040f9f4..5fa62ffa63 100644 --- a/dlt/extract/pipe_iterator.py +++ b/dlt/extract/pipe_iterator.py @@ -24,7 +24,11 @@ ) from dlt.common.configuration.container import Container from dlt.common.exceptions import PipelineException -from dlt.common.pipeline import unset_current_pipe_name, set_current_pipe_name +from dlt.common.pipeline import ( + unset_current_pipe_name, + set_current_pipe_name, + get_current_pipe_name, +) from dlt.common.utils import get_callable_name from dlt.extract.exceptions import ( @@ -38,7 +42,7 @@ ) from dlt.extract.pipe import Pipe from dlt.extract.items import DataItemWithMeta, PipeItem, ResolvablePipeItem, SourcePipeItem -from dlt.extract.utils import wrap_async_iterator +from dlt.extract.utils import wrap_async_iterator, wrap_iterator from dlt.extract.concurrency import FuturesPool TPipeNextItemMode = Literal["fifo", "round_robin"] @@ -179,10 +183,12 @@ def __next__(self) -> PipeItem: item = pipe_item.item # if item is iterator, then add it as a new source + # we wrap it to make it stoppable if isinstance(item, Iterator): - # print(f"adding iterable {item}") self._sources.append( - SourcePipeItem(item, pipe_item.step, pipe_item.pipe, pipe_item.meta) + SourcePipeItem( + wrap_iterator(item), pipe_item.step, pipe_item.pipe, pipe_item.meta + ) ) pipe_item = None continue @@ -291,7 +297,6 @@ def _get_source_item(self) -> ResolvablePipeItem: first_evaluated_index = self._current_source_index # always go round robin if None was returned or item is to be run as future self._current_source_index = (self._current_source_index - 1) % sources_count - except StopIteration: # remove empty iterator and try another source self._sources.pop(self._current_source_index) diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 42e3905162..689e0a91f8 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -41,6 +41,7 @@ MapItem, YieldMapItem, ValidateItem, + LimitItem, ) from dlt.extract.pipe_iterator import ManagedPipeIterator from dlt.extract.pipe import Pipe, TPipeStep @@ -363,56 +364,14 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no "DltResource": returns self """ - # make sure max_items is a number, to allow "None" as value for unlimited - if max_items is None: - max_items = -1 - - def _gen_wrap(gen: TPipeStep) -> TPipeStep: - """Wrap a generator to take the first `max_items` records""" - - # zero items should produce empty generator - if max_items == 0: - return - - count = 0 - is_async_gen = False - if callable(gen): - gen = gen() # type: ignore - - # wrap async gen already here - if isinstance(gen, AsyncIterator): - gen = wrap_async_iterator(gen) - is_async_gen = True - - try: - for i in gen: # type: ignore # TODO: help me fix this later - yield i - if i is not None: - count += 1 - # async gen yields awaitable so we must count one awaitable more - # so the previous one is evaluated and yielded. - # new awaitable will be cancelled - if count == max_items + int(is_async_gen): - return - finally: - if inspect.isgenerator(gen): - gen.close() - return - - # transformers should be limited by their input, so we only limit non-transformers - if not self.is_transformer: - gen = self._pipe.gen - # wrap gen directly - if inspect.isgenerator(gen): - self._pipe.replace_gen(_gen_wrap(gen)) - else: - # keep function as function to not evaluate generators before pipe starts - self._pipe.replace_gen(partial(_gen_wrap, gen)) - else: + if self.is_transformer: logger.warning( f"Setting add_limit to a transformer {self.name} has no effect. Set the limit on" " the top level resource." ) + else: + self.add_step(LimitItem(max_items)) + return self def parallelize(self: TDltResourceImpl) -> TDltResourceImpl: diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index 68570d0995..0bcd13155e 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -183,6 +183,17 @@ def check_compat_transformer(name: str, f: AnyFun, sig: inspect.Signature) -> in return meta_arg +def wrap_iterator(gen: Iterator[TDataItems]) -> Iterator[TDataItems]: + """Wraps an iterator into a generator""" + if inspect.isgenerator(gen): + return gen + + def wrapped_gen() -> Iterator[TDataItems]: + yield from gen + + return wrapped_gen() + + def wrap_async_iterator( gen: AsyncIterator[TDataItems], ) -> Generator[Awaitable[TDataItems], None, None]: From fca53a8956e3f85e7d7658468221361febbd47fc Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 11 Dec 2024 15:39:03 +0100 Subject: [PATCH 02/16] prevent late arriving items to be forwarded from limit add some convenience methods for pipe step management --- dlt/extract/items.py | 5 +++++ dlt/extract/pipe.py | 18 +++++++++++++++++- dlt/extract/resource.py | 31 ++++++++++++------------------- 3 files changed, 34 insertions(+), 20 deletions(-) diff --git a/dlt/extract/items.py b/dlt/extract/items.py index 21738abcef..893c33f475 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -249,12 +249,17 @@ def __init__(self, max_items: int) -> None: def bind(self, pipe: SupportsPipe) -> "LimitItem": self.gen = pipe.gen self.count = 0 + self.exceeded = False return self def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + # detect when the limit is reached if self.count == self.max_items: + self.exhausted = True if inspect.isgenerator(self.gen): self.gen.close() + # do not return any late arriving items + if self.exhausted: return None self.count += 1 return item diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 0e24de9558..411df40c35 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -123,7 +123,23 @@ def steps(self) -> List[TPipeStep]: def find(self, *step_type: AnyType) -> int: """Finds a step with object of type `step_type`""" - return next((i for i, v in enumerate(self._steps) if isinstance(v, step_type)), -1) + found = self.find_all(step_type) + return found[0] if found else -1 + + def find_all(self, *step_type: AnyType) -> List[int]: + """Finds all steps with object of type `step_type`""" + return [i for i, v in enumerate(self._steps) if isinstance(v, step_type)] + + def get_by_type(self, *step_type: AnyType) -> TPipeStep: + """Gets first step found with object of type `step_type`""" + return next((v for v in self._steps if isinstance(v, step_type)), None) + + def remove_by_type(self, *step_type: AnyType) -> int: + """Deletes first step found with object of type `step_type`, returns previous index""" + step_index = self.find(*step_type) + if step_index >= 0: + self.remove_step(step_index) + return step_index def __getitem__(self, i: int) -> TPipeStep: return self._steps[i] diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 689e0a91f8..6e47e77aa3 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -2,7 +2,7 @@ from functools import partial from typing import ( AsyncIterable, - AsyncIterator, + cast, ClassVar, Callable, Iterable, @@ -215,31 +215,24 @@ def requires_args(self) -> bool: return True @property - def incremental(self) -> IncrementalResourceWrapper: + def incremental(self) -> Optional[IncrementalResourceWrapper]: """Gets incremental transform if it is in the pipe""" - incremental: IncrementalResourceWrapper = None - step_no = self._pipe.find(IncrementalResourceWrapper, Incremental) - if step_no >= 0: - incremental = self._pipe.steps[step_no] # type: ignore - return incremental + return cast( + Optional[IncrementalResourceWrapper], + self._pipe.get_by_type(IncrementalResourceWrapper, Incremental), + ) @property def validator(self) -> Optional[ValidateItem]: """Gets validator transform if it is in the pipe""" - validator: ValidateItem = None - step_no = self._pipe.find(ValidateItem) - if step_no >= 0: - validator = self._pipe.steps[step_no] # type: ignore[assignment] - return validator + return cast(Optional[ValidateItem], self._pipe.get_by_type(ValidateItem)) @validator.setter def validator(self, validator: Optional[ValidateItem]) -> None: """Add/remove or replace the validator in pipe""" - step_no = self._pipe.find(ValidateItem) - if step_no >= 0: - self._pipe.remove_step(step_no) + self._pipe.remove_by_type(ValidateItem) if validator: - self.add_step(validator, insert_at=step_no if step_no >= 0 else None) + self.add_step(validator) @property def max_table_nesting(self) -> Optional[int]: @@ -370,6 +363,8 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no " the top level resource." ) else: + # remove existing limit if any + self._pipe.remove_by_type(LimitItem) self.add_step(LimitItem(max_items)) return self @@ -404,9 +399,7 @@ def add_step( return self def _remove_incremental_step(self) -> None: - step_no = self._pipe.find(Incremental, IncrementalResourceWrapper) - if step_no >= 0: - self._pipe.remove_step(step_no) + self._pipe.remove_by_type(Incremental, IncrementalResourceWrapper) def set_incremental( self, From 5c8ef139596a25e1c5760a18c3d9177080feafc9 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 11 Dec 2024 16:32:42 +0100 Subject: [PATCH 03/16] added a few more tests for limit --- dlt/extract/items.py | 2 +- tests/extract/test_sources.py | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/dlt/extract/items.py b/dlt/extract/items.py index 893c33f475..13002a05a3 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -249,7 +249,7 @@ def __init__(self, max_items: int) -> None: def bind(self, pipe: SupportsPipe) -> "LimitItem": self.gen = pipe.gen self.count = 0 - self.exceeded = False + self.exhausted = False return self def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 3d021d5d10..d3d5016392 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -861,6 +861,40 @@ async def r_async(): raise AssertionError(f"Unexpected limit: {limit}") +def test_various_limit_setups() -> None: + # basic test + r = dlt.resource([1, 2, 3, 4, 5], name="test").add_limit(3) + assert list(r) == [1, 2, 3] + + # yield map test + r = ( + dlt.resource([1, 2, 3, 4, 5], name="test") + .add_map(lambda i: str(i) * i, 1) + .add_yield_map(lambda i: (yield from i)) + .add_limit(3) + ) + # limit is applied at the end + assert list(r) == ["1", "2", "2"] # "3" ,"3" ,"3" ,"4" ,"4" ,"4" ,"4", ...] + + # nested lists test (limit only applied to yields, not actual items) + r = dlt.resource([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]], name="test").add_limit(3) + assert list(r) == [1, 2, 3, 4, 5, 6, 7, 8, 9] + + # transformer test + r = dlt.resource([1, 2, 3, 4, 5], name="test").add_limit(4) + t = dlt.transformer(lambda i: i * 2, name="test") + assert list(r) == [1, 2, 3, 4] + assert list(r | t) == [2, 4, 6, 8] + + # adding limit to transformer is disregarded + t = t.add_limit(2) + assert list(r | t) == [2, 4, 6, 8] + + # limits are fully replaced (more genereous limit applied later takes precedence) + r = dlt.resource([1, 2, 3, 4, 5], name="test").add_limit(3).add_limit(4) + assert list(r) == [1, 2, 3, 4] + + def test_limit_source() -> None: def mul_c(item): yield from "A" * (item + 2) From e4a9ec3dffefbadfaaa1a7a4dbfbe4866f30ccf8 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 11 Dec 2024 17:13:45 +0100 Subject: [PATCH 04/16] add more limit functions from branch --- dlt/extract/items.py | 29 ++++++++++++++++-- dlt/extract/resource.py | 27 +++++++++++------ tests/extract/test_sources.py | 57 +++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 13 deletions(-) diff --git a/dlt/extract/items.py b/dlt/extract/items.py index 13002a05a3..cc4e366744 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -1,4 +1,6 @@ import inspect +import time + from abc import ABC, abstractmethod from typing import ( Any, @@ -243,23 +245,44 @@ def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: class LimitItem(ItemTransform[TDataItem]): placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental - def __init__(self, max_items: int) -> None: + def __init__( + self, max_items: Optional[int], max_time: Optional[float], min_wait: Optional[float] + ) -> None: self.max_items = max_items if max_items is not None else -1 + self.max_time = max_time + self.min_wait = min_wait def bind(self, pipe: SupportsPipe) -> "LimitItem": self.gen = pipe.gen self.count = 0 self.exhausted = False + self.start_time = time.time() + self.last_call_time = 0.0 return self def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - # detect when the limit is reached - if self.count == self.max_items: + # detect when the limit is reached, time or yield count + if (self.count == self.max_items) or ( + self.max_time and time.time() - self.start_time > self.max_time + ): self.exhausted = True if inspect.isgenerator(self.gen): self.gen.close() + # do not return any late arriving items if self.exhausted: return None self.count += 1 + + # if we have a min wait and the last iteration was less than min wait ago, + # we sleep on this thread a bit + if self.min_wait and (time.time() - self.last_call_time) < self.min_wait: + # NOTE: this should be interruptable? + # NOTE: this is sleeping on the main thread, we should carefully document this! + time.sleep(self.min_wait - (time.time() - self.last_call_time)) + + # remember last iteration time + if self.min_wait: + self.last_call_time = time.time() + return item diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 6e47e77aa3..cd40ccb87b 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -341,20 +341,27 @@ def add_filter( self._pipe.insert_step(FilterItem(item_filter), insert_at) return self - def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # noqa: A003 + def add_limit( + self: TDltResourceImpl, + max_items: Optional[int] = None, + max_time: Optional[float] = None, + min_wait: Optional[float] = None, + ) -> TDltResourceImpl: # noqa: A003 """Adds a limit `max_items` to the resource pipe. - This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging. + This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging. - Notes: - 1. Transformers won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets. - 2. Each yielded item may contain several records. `add_limit` only limits the "number of yields", not the total number of records. - 3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic. + Notes: + 1. Transformers won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets. + 2. Each yielded item may contain several records. `add_limit` only limits the "number of yields", not the total number of records. + 3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic. Args: - max_items (int): The maximum number of items to yield - Returns: - "DltResource": returns self + max_items (int): The maximum number of items to yield, set to None for no limit + max_time (float): The maximum number of seconds for this generator to run after it was opened, set to None for no limit + min_wait (float): The minimum number of seconds to wait between iterations (useful for rate limiting) + Returns: + "DltResource": returns self """ if self.is_transformer: @@ -365,7 +372,7 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no else: # remove existing limit if any self._pipe.remove_by_type(LimitItem) - self.add_step(LimitItem(max_items)) + self.add_step(LimitItem(max_items=max_items, max_time=max_time, min_wait=min_wait)) return self diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index d3d5016392..946b0ee2b8 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -1,4 +1,6 @@ import itertools +import time + from typing import Iterator import pytest @@ -910,6 +912,61 @@ def infinite_source(): assert list(infinite_source().add_limit(2)) == ["A", "A", 0, "A", "A", "A", 1] * 3 +def test_limit_max_time() -> None: + @dlt.resource() + def r(): + for i in range(100): + time.sleep(0.1) + yield i + + @dlt.resource() + async def r_async(): + for i in range(100): + await asyncio.sleep(0.1) + yield i + + sync_list = list(r().add_limit(max_time=1)) + async_list = list(r_async().add_limit(max_time=1)) + + # we should have extracted 10 items within 1 second, sleep is included in the resource + # we allow for some variance in the number of items, as the sleep is not super precise + allowed_results = [ + list(range(12)), + list(range(11)), + list(range(10)), + list(range(9)), + list(range(8)), + ] + assert sync_list in allowed_results + assert async_list in allowed_results + + +def test_limit_min_wait() -> None: + @dlt.resource() + def r(): + for i in range(100): + yield i + + @dlt.resource() + async def r_async(): + for i in range(100): + yield i + + sync_list = list(r().add_limit(max_time=1, min_wait=0.2)) + async_list = list(r_async().add_limit(max_time=1, min_wait=0.2)) + + # we should have extracted about 5 items within 1 second, sleep is done via min_wait + allowed_results = [ + list(range(3)), + list(range(4)), + list(range(5)), + list(range(6)), + list(range(7)), + ] + assert sync_list in allowed_results + assert async_list in allowed_results + + def test_source_state() -> None: @dlt.source def test_source(expected_state): From 33faff5098f8dd188b90229fecaeae792ec8b0b3 Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 12 Dec 2024 08:22:43 +0100 Subject: [PATCH 05/16] remove rate-limiting --- dlt/extract/items.py | 17 +---------------- dlt/extract/resource.py | 4 +--- tests/extract/test_sources.py | 26 -------------------------- 3 files changed, 2 insertions(+), 45 deletions(-) diff --git a/dlt/extract/items.py b/dlt/extract/items.py index cc4e366744..db53794f75 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -245,19 +245,15 @@ def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: class LimitItem(ItemTransform[TDataItem]): placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental - def __init__( - self, max_items: Optional[int], max_time: Optional[float], min_wait: Optional[float] - ) -> None: + def __init__(self, max_items: Optional[int], max_time: Optional[float]) -> None: self.max_items = max_items if max_items is not None else -1 self.max_time = max_time - self.min_wait = min_wait def bind(self, pipe: SupportsPipe) -> "LimitItem": self.gen = pipe.gen self.count = 0 self.exhausted = False self.start_time = time.time() - self.last_call_time = 0.0 return self def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: @@ -274,15 +270,4 @@ def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: return None self.count += 1 - # if we have a min wait and the last iteration was less than min wait ago, - # we sleep on this thread a bit - if self.min_wait and (time.time() - self.last_call_time) < self.min_wait: - # NOTE: this should be interruptable? - # NOTE: this is sleeping on the main thread, we should carefully document this! - time.sleep(self.min_wait - (time.time() - self.last_call_time)) - - # remember last iteration time - if self.min_wait: - self.last_call_time = time.time() - return item diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index cd40ccb87b..39206f6d50 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -345,7 +345,6 @@ def add_limit( self: TDltResourceImpl, max_items: Optional[int] = None, max_time: Optional[float] = None, - min_wait: Optional[float] = None, ) -> TDltResourceImpl: # noqa: A003 """Adds a limit `max_items` to the resource pipe. @@ -359,7 +358,6 @@ def add_limit( Args: max_items (int): The maximum number of items to yield, set to None for no limit max_time (float): The maximum number of seconds for this generator to run after it was opened, set to None for no limit - min_wait (float): The minimum number of seconds to wait between iterations (useful for rate limiting) Returns: "DltResource": returns self """ @@ -372,7 +370,7 @@ def add_limit( else: # remove existing limit if any self._pipe.remove_by_type(LimitItem) - self.add_step(LimitItem(max_items=max_items, max_time=max_time, min_wait=min_wait)) + self.add_step(LimitItem(max_items=max_items, max_time=max_time)) return self diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 946b0ee2b8..80fb8017ad 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -941,32 +941,6 @@ async def r_async(): assert async_list in allowed_results -def test_limit_min_wait() -> None: - @dlt.resource() - def r(): - for i in range(100): - yield i - - @dlt.resource() - async def r_async(): - for i in range(100): - yield i - - sync_list = list(r().add_limit(max_time=1, min_wait=0.2)) - async_list = list(r_async().add_limit(max_time=1, min_wait=0.2)) - - # we should have extracted about 5 items within 1 second, sleep is done via min_wait - allowed_results = [ - list(range(3)), - list(range(4)), - list(range(5)), - list(range(6)), - list(range(7)), - ] - assert sync_list in allowed_results - assert async_list in allowed_results - - def test_source_state() -> None: @dlt.source def test_source(expected_state): From e3d461074181561807f179c1f51a4b7823ca0485 Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 12 Dec 2024 08:42:27 +0100 Subject: [PATCH 06/16] fix limiting bug and update docs --- dlt/extract/items.py | 16 ++++++++++++---- docs/website/docs/general-usage/resource.md | 17 ++++++++++++++++- docs/website/docs/general-usage/source.md | 14 +++++++++++++- 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/dlt/extract/items.py b/dlt/extract/items.py index db53794f75..b3279b1d61 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -257,17 +257,25 @@ def bind(self, pipe: SupportsPipe) -> "LimitItem": return self def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - # detect when the limit is reached, time or yield count - if (self.count == self.max_items) or ( - self.max_time and time.time() - self.start_time > self.max_time + self.count += 1 + + # detect when the limit is reached, max time or yield count + if ( + (self.count == self.max_items) + or (self.max_time and time.time() - self.start_time > self.max_time) + or self.max_items == 0 ): self.exhausted = True if inspect.isgenerator(self.gen): self.gen.close() + # if max items is not 0, we return the last item + # otherwise never return anything + if self.max_items != 0: + return item + # do not return any late arriving items if self.exhausted: return None - self.count += 1 return item diff --git a/docs/website/docs/general-usage/resource.md b/docs/website/docs/general-usage/resource.md index 199eaf9b5d..b8d51caf75 100644 --- a/docs/website/docs/general-usage/resource.md +++ b/docs/website/docs/general-usage/resource.md @@ -405,11 +405,26 @@ dlt.pipeline(destination="duckdb").run(my_resource().add_limit(10)) The code above will extract `15*10=150` records. This is happening because in each iteration, 15 records are yielded, and we're limiting the number of iterations to 10. ::: -Some constraints of `add_limit` include: +Altenatively you can also apply a time limit to the resource. The code below will run the extraction for 10 seconds and extract how ever many items are yielded in that time. In combination with incrementals, this can be useful for batched loading or for loading on machines that have a run time limit. + +```py +dlt.pipeline(destination="duckdb").run(my_resource().add_limit(max_time=10)) +``` + +You can also apply a combination of both limits. In this case the extraction will stop as soon as either limit is reached. + +```py +dlt.pipeline(destination="duckdb").run(my_resource().add_limit(max_items=10, max_time=10)) +``` + + +Some notes about the `add_limit`: 1. `add_limit` does not skip any items. It closes the iterator/generator that produces data after the limit is reached. 2. You cannot limit transformers. They should process all the data they receive fully to avoid inconsistencies in generated datasets. 3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic. +4. Calling add limit on a resource will replace any previously set limits settings. +5. For time-limited resources, the timer starts when the first item is processed. When resources are processed sequentially (FIFO mode), each resource's time limit applies also sequentially. In the default round robin mode, the time limits will usually run concurrently. :::tip If you are parameterizing the value of `add_limit` and sometimes need it to be disabled, you can set `None` or `-1` to disable the limiting. diff --git a/docs/website/docs/general-usage/source.md b/docs/website/docs/general-usage/source.md index 87c07a3e44..9c6c2aac13 100644 --- a/docs/website/docs/general-usage/source.md +++ b/docs/website/docs/general-usage/source.md @@ -107,8 +107,20 @@ load_info = pipeline.run(pipedrive_source().add_limit(10)) print(load_info) ``` +You can also apply a time limit to the source: + +```py +pipeline.run(pipedrive_source().add_limit(max_time=10)) +``` + +Or limit by both, the limit that is reached first will stop the extraction: + +```py +pipeline.run(pipedrive_source().add_limit(max_items=10, max_time=10)) +``` + :::note -Note that `add_limit` **does not limit the number of records** but rather the "number of yields". `dlt` will close the iterator/generator that produces data after the limit is reached. +Note that `add_limit` **does not limit the number of records** but rather the "number of yields". `dlt` will close the iterator/generator that produces data after the limit is reached. Please read in more detail about the `add_limit` on the resource page. ::: Find more on sampling data [here](resource.md#sample-from-large-data). From c9fe2b36c8c2bdf014ec0376d39c6edb30566395 Mon Sep 17 00:00:00 2001 From: Dave Date: Sun, 15 Dec 2024 21:13:28 +0100 Subject: [PATCH 07/16] revert back to inserting validator step at the same position if replaced --- dlt/extract/resource.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 39206f6d50..2f4f66a96e 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -230,9 +230,9 @@ def validator(self) -> Optional[ValidateItem]: @validator.setter def validator(self, validator: Optional[ValidateItem]) -> None: """Add/remove or replace the validator in pipe""" - self._pipe.remove_by_type(ValidateItem) + step_no = self._pipe.remove_by_type(ValidateItem) if validator: - self.add_step(validator) + self.add_step(validator, insert_at=step_no if step_no >= 0 else None) @property def max_table_nesting(self) -> Optional[int]: From ac31cfd52244e569793bc81f1faba787ae310d30 Mon Sep 17 00:00:00 2001 From: Dave Date: Sun, 15 Dec 2024 21:18:26 +0100 Subject: [PATCH 08/16] make time limit tests more lenient for mac os tests --- tests/extract/test_sources.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 80fb8017ad..fbc0767744 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -930,13 +930,8 @@ async def r_async(): # we should have extracted 10 items within 1 second, sleep is included in the resource # we allow for some variance in the number of items, as the sleep is not super precise - allowed_results = [ - list(range(12)), - list(range(11)), - list(range(10)), - list(range(9)), - list(range(8)), - ] + # on mac os we even sometimes just get 4 items... + allowed_results = [list(range(i)) for i in [12, 11, 10, 9, 8, 7, 6, 5, 4]] assert sync_list in allowed_results assert async_list in allowed_results From a05ee7c77f61dc7d4eec6949bc53fa0d52a1c9f4 Mon Sep 17 00:00:00 2001 From: Dave Date: Sun, 15 Dec 2024 21:43:02 +0100 Subject: [PATCH 09/16] tmp --- tests/extract/test_incremental.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index d63dac93f2..0a0c1f8c2d 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -7,6 +7,7 @@ from time import sleep from typing import Any, Optional, Literal, Sequence, Dict, Iterable from unittest import mock +import itertools import duckdb import pyarrow as pa @@ -3853,7 +3854,6 @@ def some_data(): for col in table_schema["columns"].values(): assert "incremental" not in col - @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) @pytest.mark.parametrize("last_value_func", [min, max]) def test_start_range_open(item_type: TestDataItemFormat, last_value_func: Any) -> None: @@ -3960,3 +3960,18 @@ def some_data( # Includes values 5-10 inclusive assert items == expected_items + +def test_incremental_and_limit(): + + @dlt.resource(incremental=dlt.sources.incremental(cursor_path="id", initial_value=0, last_value_func=min)) + def resource(): + for i in range(100): + yield { + "id": i, + "value": str(i), + } + + p = dlt.pipeline(pipeline_name="incremtal_limit", destination="duckdb", dev_mode=True) + + p.run(resource().add_limit(10)) + From f109a87d51c31b82902068a6aff379f54c1173b5 Mon Sep 17 00:00:00 2001 From: Dave Date: Sun, 15 Dec 2024 22:04:49 +0100 Subject: [PATCH 10/16] add test for testing incremental with limit --- tests/extract/test_incremental.py | 51 ++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 0a0c1f8c2d..205df41a81 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -3854,6 +3854,7 @@ def some_data(): for col in table_schema["columns"].values(): assert "incremental" not in col + @pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS) @pytest.mark.parametrize("last_value_func", [min, max]) def test_start_range_open(item_type: TestDataItemFormat, last_value_func: Any) -> None: @@ -3961,17 +3962,57 @@ def some_data( # Includes values 5-10 inclusive assert items == expected_items -def test_incremental_and_limit(): - @dlt.resource(incremental=dlt.sources.incremental(cursor_path="id", initial_value=0, last_value_func=min)) - def resource(): - for i in range(100): +@pytest.mark.parametrize("offset_by_last_value", [True, False]) +def test_incremental_and_limit(offset_by_last_value: bool): + resource_called = 0 + + # here we check incremental and limit when incremental once when last value cannot be used + # to offset the source, and once when it can. + + @dlt.resource( + table_name="items", + ) + def resource( + incremental=dlt.sources.incremental(cursor_path="id", initial_value=-1, row_order="asc") + ): + range_iterator = ( + range(incremental.start_value + 1, 1000) if offset_by_last_value else range(1000) + ) + for i in range_iterator: + nonlocal resource_called + resource_called += 1 yield { "id": i, "value": str(i), } + resource.add_limit(10) + p = dlt.pipeline(pipeline_name="incremtal_limit", destination="duckdb", dev_mode=True) - p.run(resource().add_limit(10)) + p.run(resource()) + + # check we have the right number of items + assert len(p.dataset().items.df()) == 10 + assert resource_called == 10 + # check that we have items 0-9 + assert p.dataset().items.df().id.tolist() == list(range(10)) + + # run the next ten + p.run(resource()) + + # check we have the right number of items + assert len(p.dataset().items.df()) == 20 + assert resource_called == 20 if offset_by_last_value else 30 + # check that we have items 0-19 + assert p.dataset().items.df().id.tolist() == list(range(20)) + + # run the next batch + p.run(resource()) + # check we have the right number of items + assert len(p.dataset().items.df()) == 30 + assert resource_called == 30 if offset_by_last_value else 60 + # check that we have items 0-29 + assert p.dataset().items.df().id.tolist() == list(range(30)) From caf92bf08ac2b79a8295cd613c480483ba207635 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 16 Dec 2024 12:36:08 +0100 Subject: [PATCH 11/16] improve limit tests with parallelized case --- tests/extract/test_sources.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index fbc0767744..86646e6369 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -839,7 +839,7 @@ def test_limit_infinite_counter() -> None: @pytest.mark.parametrize("limit", (None, -1, 0, 10)) def test_limit_edge_cases(limit: int) -> None: - r = dlt.resource(range(20), name="infinity").add_limit(limit) # type: ignore + r = dlt.resource(range(20), name="resource").add_limit(limit) # type: ignore @dlt.resource() async def r_async(): @@ -847,18 +847,24 @@ async def r_async(): await asyncio.sleep(0.01) yield i + @dlt.resource(parallelized=True) + def parallelized_resource(): + for i in range(20): + yield i + sync_list = list(r) async_list = list(r_async().add_limit(limit)) + parallelized_list = list(parallelized_resource().add_limit(limit)) + + # all lists should be the same + assert sync_list == async_list == parallelized_list if limit == 10: assert sync_list == list(range(10)) - # we have edge cases where the async list will have one extra item - # possibly due to timing issues, maybe some other implementation problem - assert (async_list == list(range(10))) or (async_list == list(range(11))) elif limit in [None, -1]: - assert sync_list == async_list == list(range(20)) + assert sync_list == list(range(20)) elif limit == 0: - assert sync_list == async_list == [] + assert sync_list == [] else: raise AssertionError(f"Unexpected limit: {limit}") From 2fa2bf8b67aa613d992100423383415d1df77752 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 16 Dec 2024 13:51:01 +0100 Subject: [PATCH 12/16] add backfill example with sql_database --- chunk_pipeline.py | 12 +++ docs/examples/backfill_in_chunks/__init__.py | 0 .../backfill_in_chunks/backfill_in_chunks.py | 85 +++++++++++++++++++ 3 files changed, 97 insertions(+) create mode 100644 chunk_pipeline.py create mode 100644 docs/examples/backfill_in_chunks/__init__.py create mode 100644 docs/examples/backfill_in_chunks/backfill_in_chunks.py diff --git a/chunk_pipeline.py b/chunk_pipeline.py new file mode 100644 index 0000000000..2a06799f1d --- /dev/null +++ b/chunk_pipeline.py @@ -0,0 +1,12 @@ + + + + + +import pandas as pd + +import dlt +from dlt.sources.sql_database import sql_database + +if __name__ == "__main__": + diff --git a/docs/examples/backfill_in_chunks/__init__.py b/docs/examples/backfill_in_chunks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/examples/backfill_in_chunks/backfill_in_chunks.py b/docs/examples/backfill_in_chunks/backfill_in_chunks.py new file mode 100644 index 0000000000..f0e3e60aa4 --- /dev/null +++ b/docs/examples/backfill_in_chunks/backfill_in_chunks.py @@ -0,0 +1,85 @@ +""" +--- +title: Backfilling in chunks +description: Learn how to backfill in chunks of defined size +keywords: [incremental loading, backfilling, chunks,example] +--- + +In this example, you'll find a Python script that will load from a sql_database source in chunks of defined size. This is useful for backfilling in multiple pipeline runs as +opposed to backfilling in one very large pipeline run which may fail due to memory issues on ephemeral storage or just take a very long time to complete without seeing any +progress in the destination. + +We'll learn how to: + +- Connect to a mysql database with the sql_database source +- Select one table to load and apply incremental loading hints as well as the primary key +- Set the chunk size and limit the number of chunks to load in one pipeline run +- Create a pipeline and backfill the table in the defined chunks +- Use the datasets accessor to inspect and assert the load progress + +""" + +import pandas as pd + +import dlt +from dlt.sources.sql_database import sql_database + + +if __name__ == "__main__": + # NOTE: this is a live table in the rfam database, so the number of final rows may change + TOTAL_TABLE_ROWS = 4178 + RFAM_CONNECTION_STRING = "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" + + # create sql database source that only loads the family table in chunks of 1000 rows + source = sql_database(RFAM_CONNECTION_STRING, table_names=["family"], chunk_size=1000) + + # we apply some hints to the table, we know the rfam_id is unique and that we can order + # and load incrementally on the created datetime column + source.family.apply_hints( + primary_key="rfam_id", + incremental=dlt.sources.incremental( + cursor_path="created", initial_value=None, row_order="asc" + ), + ) + + # with limit we can limit the number of chunks to load, with a chunk size of 1000 and a limit of 1 + # we will load 1000 rows per pipeline run + source.add_limit(1) + + # create pipeline + pipeline = dlt.pipeline( + pipeline_name="rfam", destination="duckdb", dataset_name="rfam_data", dev_mode=True + ) + + def _assert_unique_row_count(df: pd.DataFrame, num_rows: int) -> None: + """Assert that a dataframe has the correct number of unique rows""" + # NOTE: this check is dependent on reading the full table back from the destination into memory, + # so it is only useful for testing before you do a large backfill. + assert len(df) == num_rows + assert len(set(df.rfam_id.tolist())) == num_rows + + # after the first run, the family table in the destination should contain the first 1000 rows + pipeline.run(source) + _assert_unique_row_count(pipeline.dataset().family.df(), 1000) + + # after the second run, the family table in the destination should contain 1999 rows + # there is some overlap on the incremental to prevent skipping rows + pipeline.run(source) + _assert_unique_row_count(pipeline.dataset().family.df(), 1999) + + # ... + pipeline.run(source) + _assert_unique_row_count(pipeline.dataset().family.df(), 2998) + + # ... + pipeline.run(source) + _assert_unique_row_count(pipeline.dataset().family.df(), 3997) + + # the final run will load all the rows until the end of the table + pipeline.run(source) + _assert_unique_row_count(pipeline.dataset().family.df(), TOTAL_TABLE_ROWS) + + # NOTE: in a production environment you will likely: + # * be using much larger chunk sizes and limits + # * run the pipeline in a loop to load all the rows + # * and programmatically check if the table is fully loaded and abort the loop if this is the case. From dcc3d556cbd5d012b39d47b73e41e7110e901737 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 16 Dec 2024 14:11:17 +0100 Subject: [PATCH 13/16] fix linting --- docs/examples/backfill_in_chunks/backfill_in_chunks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/examples/backfill_in_chunks/backfill_in_chunks.py b/docs/examples/backfill_in_chunks/backfill_in_chunks.py index f0e3e60aa4..a758d67f7b 100644 --- a/docs/examples/backfill_in_chunks/backfill_in_chunks.py +++ b/docs/examples/backfill_in_chunks/backfill_in_chunks.py @@ -5,7 +5,7 @@ keywords: [incremental loading, backfilling, chunks,example] --- -In this example, you'll find a Python script that will load from a sql_database source in chunks of defined size. This is useful for backfilling in multiple pipeline runs as +In this example, you'll find a Python script that will load from a sql_database source in chunks of defined size. This is useful for backfilling in multiple pipeline runs as opposed to backfilling in one very large pipeline run which may fail due to memory issues on ephemeral storage or just take a very long time to complete without seeing any progress in the destination. @@ -78,8 +78,8 @@ def _assert_unique_row_count(df: pd.DataFrame, num_rows: int) -> None: # the final run will load all the rows until the end of the table pipeline.run(source) _assert_unique_row_count(pipeline.dataset().family.df(), TOTAL_TABLE_ROWS) - + # NOTE: in a production environment you will likely: # * be using much larger chunk sizes and limits # * run the pipeline in a loop to load all the rows - # * and programmatically check if the table is fully loaded and abort the loop if this is the case. + # * and programmatically check if the table is fully loaded and abort the loop if this is the case. From 82c3d686f0cff93f520fa006af5e427a9294f811 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 16 Dec 2024 14:11:33 +0100 Subject: [PATCH 14/16] remove extra file --- chunk_pipeline.py | 12 ------------ 1 file changed, 12 deletions(-) delete mode 100644 chunk_pipeline.py diff --git a/chunk_pipeline.py b/chunk_pipeline.py deleted file mode 100644 index 2a06799f1d..0000000000 --- a/chunk_pipeline.py +++ /dev/null @@ -1,12 +0,0 @@ - - - - - -import pandas as pd - -import dlt -from dlt.sources.sql_database import sql_database - -if __name__ == "__main__": - From 9d38f56d1f3a34cb6e721955e185b03cf4ad29d0 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 16 Dec 2024 14:28:26 +0100 Subject: [PATCH 15/16] only wrap iterators on demand --- dlt/extract/items.py | 13 +++++++++++++ dlt/extract/pipe.py | 5 ----- dlt/extract/pipe_iterator.py | 7 ++----- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/dlt/extract/items.py b/dlt/extract/items.py index b3279b1d61..399f67a947 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -115,6 +115,10 @@ def gen(self) -> TPipeStep: """A data generating step""" ... + def replace_gen(self, gen: TPipeStep) -> None: + """Replaces data generating step. Assumes that you know what are you doing""" + ... + def __getitem__(self, i: int) -> TPipeStep: """Get pipe step at index""" ... @@ -250,10 +254,19 @@ def __init__(self, max_items: Optional[int], max_time: Optional[float]) -> None: self.max_time = max_time def bind(self, pipe: SupportsPipe) -> "LimitItem": + # we also wrap iterators to make them stoppable + from dlt.extract.utils import ( + wrap_iterator, + ) + + if isinstance(pipe.gen, Iterator): + pipe.replace_gen(wrap_iterator(pipe.gen)) + self.gen = pipe.gen self.count = 0 self.exhausted = False self.start_time = time.time() + return self def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 411df40c35..b95c9a6c40 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -39,7 +39,6 @@ wrap_compat_transformer, wrap_resource_gen, wrap_async_iterator, - wrap_iterator, ) @@ -296,10 +295,6 @@ def evaluate_gen(self) -> None: if isinstance(self.gen, AsyncIterator): self.replace_gen(wrap_async_iterator(self.gen)) - # we also wrap iterators to make them stoppable - if isinstance(self.gen, Iterator): - self.replace_gen(wrap_iterator(self.gen)) - # evaluate transforms for step_no, step in enumerate(self._steps): # print(f"pipe {self.name} step no {step_no} step({step})") diff --git a/dlt/extract/pipe_iterator.py b/dlt/extract/pipe_iterator.py index 5fa62ffa63..38641c0626 100644 --- a/dlt/extract/pipe_iterator.py +++ b/dlt/extract/pipe_iterator.py @@ -42,7 +42,7 @@ ) from dlt.extract.pipe import Pipe from dlt.extract.items import DataItemWithMeta, PipeItem, ResolvablePipeItem, SourcePipeItem -from dlt.extract.utils import wrap_async_iterator, wrap_iterator +from dlt.extract.utils import wrap_async_iterator from dlt.extract.concurrency import FuturesPool TPipeNextItemMode = Literal["fifo", "round_robin"] @@ -183,12 +183,9 @@ def __next__(self) -> PipeItem: item = pipe_item.item # if item is iterator, then add it as a new source - # we wrap it to make it stoppable if isinstance(item, Iterator): self._sources.append( - SourcePipeItem( - wrap_iterator(item), pipe_item.step, pipe_item.pipe, pipe_item.meta - ) + SourcePipeItem(item, pipe_item.step, pipe_item.pipe, pipe_item.meta) ) pipe_item = None continue From fc013f591698fba41a6c3eda2e1c32997f68385b Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 16 Dec 2024 17:35:54 +0100 Subject: [PATCH 16/16] move items transform steps into extra file --- dlt/extract/exceptions.py | 1 - dlt/extract/hints.py | 3 +- dlt/extract/incremental/__init__.py | 3 +- dlt/extract/items.py | 165 ------------------------- dlt/extract/items_transform.py | 179 ++++++++++++++++++++++++++++ dlt/extract/pipe.py | 2 +- dlt/extract/resource.py | 6 +- dlt/extract/validation.py | 3 +- dlt/sources/helpers/transform.py | 2 +- tests/extract/test_extract_pipe.py | 3 +- tests/extract/test_incremental.py | 4 +- tests/extract/test_validation.py | 2 +- tests/extract/utils.py | 2 +- 13 files changed, 197 insertions(+), 178 deletions(-) create mode 100644 dlt/extract/items_transform.py diff --git a/dlt/extract/exceptions.py b/dlt/extract/exceptions.py index f4d2b1f302..e832833428 100644 --- a/dlt/extract/exceptions.py +++ b/dlt/extract/exceptions.py @@ -3,7 +3,6 @@ from dlt.common.exceptions import DltException from dlt.common.utils import get_callable_name -from dlt.extract.items import ValidateItem, TDataItems class ExtractorException(DltException): diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 000e5c4cdb..22a0062acf 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -37,7 +37,8 @@ InconsistentTableTemplate, ) from dlt.extract.incremental import Incremental, TIncrementalConfig -from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, TableNameMeta, ValidateItem +from dlt.extract.items import TFunHintTemplate, TTableHintTemplate, TableNameMeta +from dlt.extract.items_transform import ValidateItem from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint from dlt.extract.validation import create_item_validator diff --git a/dlt/extract/incremental/__init__.py b/dlt/extract/incremental/__init__.py index 5e7bae49c6..ce06292864 100644 --- a/dlt/extract/incremental/__init__.py +++ b/dlt/extract/incremental/__init__.py @@ -44,7 +44,8 @@ IncrementalArgs, TIncrementalRange, ) -from dlt.extract.items import SupportsPipe, TTableHintTemplate, ItemTransform +from dlt.extract.items import SupportsPipe, TTableHintTemplate +from dlt.extract.items_transform import ItemTransform from dlt.extract.incremental.transform import ( JsonIncremental, ArrowIncremental, diff --git a/dlt/extract/items.py b/dlt/extract/items.py index 399f67a947..ad7447c163 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -1,23 +1,16 @@ -import inspect -import time - from abc import ABC, abstractmethod from typing import ( Any, Callable, - ClassVar, - Generic, Iterator, Iterable, Literal, Optional, Protocol, - TypeVar, Union, Awaitable, TYPE_CHECKING, NamedTuple, - Generator, ) from concurrent.futures import Future @@ -30,7 +23,6 @@ TDynHintType, ) - TDecompositionStrategy = Literal["none", "scc"] TDeferredDataItems = Callable[[], TDataItems] TAwaitableDataItems = Awaitable[TDataItems] @@ -135,160 +127,3 @@ def has_parent(self) -> bool: def close(self) -> None: """Closes pipe generator""" ... - - -ItemTransformFunctionWithMeta = Callable[[TDataItem, str], TAny] -ItemTransformFunctionNoMeta = Callable[[TDataItem], TAny] -ItemTransformFunc = Union[ItemTransformFunctionWithMeta[TAny], ItemTransformFunctionNoMeta[TAny]] - - -class ItemTransform(ABC, Generic[TAny]): - _f_meta: ItemTransformFunctionWithMeta[TAny] = None - _f: ItemTransformFunctionNoMeta[TAny] = None - - placement_affinity: ClassVar[float] = 0 - """Tell how strongly an item sticks to start (-1) or end (+1) of pipe.""" - - def __init__(self, transform_f: ItemTransformFunc[TAny]) -> None: - # inspect the signature - sig = inspect.signature(transform_f) - # TODO: use TypeGuard here to get rid of type ignore - if len(sig.parameters) == 1: - self._f = transform_f # type: ignore - else: # TODO: do better check - self._f_meta = transform_f # type: ignore - - def bind(self: "ItemTransform[TAny]", pipe: SupportsPipe) -> "ItemTransform[TAny]": - return self - - @abstractmethod - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - """Transforms `item` (a list of TDataItem or a single TDataItem) and returns or yields TDataItems. Returns None to consume item (filter out)""" - pass - - -class FilterItem(ItemTransform[bool]): - # mypy needs those to type correctly - _f_meta: ItemTransformFunctionWithMeta[bool] - _f: ItemTransformFunctionNoMeta[bool] - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - if isinstance(item, list): - # preserve empty lists - if len(item) == 0: - return item - - if self._f_meta: - item = [i for i in item if self._f_meta(i, meta)] - else: - item = [i for i in item if self._f(i)] - if not item: - # item was fully consumed by the filter - return None - return item - else: - if self._f_meta: - return item if self._f_meta(item, meta) else None - else: - return item if self._f(item) else None - - -class MapItem(ItemTransform[TDataItem]): - # mypy needs those to type correctly - _f_meta: ItemTransformFunctionWithMeta[TDataItem] - _f: ItemTransformFunctionNoMeta[TDataItem] - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - if isinstance(item, list): - if self._f_meta: - return [self._f_meta(i, meta) for i in item] - else: - return [self._f(i) for i in item] - else: - if self._f_meta: - return self._f_meta(item, meta) - else: - return self._f(item) - - -class YieldMapItem(ItemTransform[Iterator[TDataItem]]): - # mypy needs those to type correctly - _f_meta: ItemTransformFunctionWithMeta[TDataItem] - _f: ItemTransformFunctionNoMeta[TDataItem] - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - if isinstance(item, list): - for i in item: - if self._f_meta: - yield from self._f_meta(i, meta) - else: - yield from self._f(i) - else: - if self._f_meta: - yield from self._f_meta(item, meta) - else: - yield from self._f(item) - - -class ValidateItem(ItemTransform[TDataItem]): - """Base class for validators of data items. - - Subclass should implement the `__call__` method to either return the data item(s) or raise `extract.exceptions.ValidationError`. - See `PydanticValidator` for possible implementation. - """ - - placement_affinity: ClassVar[float] = 0.9 # stick to end but less than incremental - - table_name: str - - def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: - self.table_name = pipe.name - return self - - -class LimitItem(ItemTransform[TDataItem]): - placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental - - def __init__(self, max_items: Optional[int], max_time: Optional[float]) -> None: - self.max_items = max_items if max_items is not None else -1 - self.max_time = max_time - - def bind(self, pipe: SupportsPipe) -> "LimitItem": - # we also wrap iterators to make them stoppable - from dlt.extract.utils import ( - wrap_iterator, - ) - - if isinstance(pipe.gen, Iterator): - pipe.replace_gen(wrap_iterator(pipe.gen)) - - self.gen = pipe.gen - self.count = 0 - self.exhausted = False - self.start_time = time.time() - - return self - - def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: - self.count += 1 - - # detect when the limit is reached, max time or yield count - if ( - (self.count == self.max_items) - or (self.max_time and time.time() - self.start_time > self.max_time) - or self.max_items == 0 - ): - self.exhausted = True - if inspect.isgenerator(self.gen): - self.gen.close() - - # if max items is not 0, we return the last item - # otherwise never return anything - if self.max_items != 0: - return item - - # do not return any late arriving items - if self.exhausted: - return None - - return item diff --git a/dlt/extract/items_transform.py b/dlt/extract/items_transform.py new file mode 100644 index 0000000000..12375640bc --- /dev/null +++ b/dlt/extract/items_transform.py @@ -0,0 +1,179 @@ +import inspect +import time + +from abc import ABC, abstractmethod +from typing import ( + Any, + Callable, + ClassVar, + Generic, + Iterator, + Optional, + Union, +) +from concurrent.futures import Future + +from dlt.common.typing import ( + TAny, + TDataItem, + TDataItems, +) + +from dlt.extract.utils import ( + wrap_iterator, +) + +from dlt.extract.items import SupportsPipe + + +ItemTransformFunctionWithMeta = Callable[[TDataItem, str], TAny] +ItemTransformFunctionNoMeta = Callable[[TDataItem], TAny] +ItemTransformFunc = Union[ItemTransformFunctionWithMeta[TAny], ItemTransformFunctionNoMeta[TAny]] + + +class ItemTransform(ABC, Generic[TAny]): + _f_meta: ItemTransformFunctionWithMeta[TAny] = None + _f: ItemTransformFunctionNoMeta[TAny] = None + + placement_affinity: ClassVar[float] = 0 + """Tell how strongly an item sticks to start (-1) or end (+1) of pipe.""" + + def __init__(self, transform_f: ItemTransformFunc[TAny]) -> None: + # inspect the signature + sig = inspect.signature(transform_f) + # TODO: use TypeGuard here to get rid of type ignore + if len(sig.parameters) == 1: + self._f = transform_f # type: ignore + else: # TODO: do better check + self._f_meta = transform_f # type: ignore + + def bind(self: "ItemTransform[TAny]", pipe: SupportsPipe) -> "ItemTransform[TAny]": + return self + + @abstractmethod + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + """Transforms `item` (a list of TDataItem or a single TDataItem) and returns or yields TDataItems. Returns None to consume item (filter out)""" + pass + + +class FilterItem(ItemTransform[bool]): + # mypy needs those to type correctly + _f_meta: ItemTransformFunctionWithMeta[bool] + _f: ItemTransformFunctionNoMeta[bool] + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if isinstance(item, list): + # preserve empty lists + if len(item) == 0: + return item + + if self._f_meta: + item = [i for i in item if self._f_meta(i, meta)] + else: + item = [i for i in item if self._f(i)] + if not item: + # item was fully consumed by the filter + return None + return item + else: + if self._f_meta: + return item if self._f_meta(item, meta) else None + else: + return item if self._f(item) else None + + +class MapItem(ItemTransform[TDataItem]): + # mypy needs those to type correctly + _f_meta: ItemTransformFunctionWithMeta[TDataItem] + _f: ItemTransformFunctionNoMeta[TDataItem] + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if isinstance(item, list): + if self._f_meta: + return [self._f_meta(i, meta) for i in item] + else: + return [self._f(i) for i in item] + else: + if self._f_meta: + return self._f_meta(item, meta) + else: + return self._f(item) + + +class YieldMapItem(ItemTransform[Iterator[TDataItem]]): + # mypy needs those to type correctly + _f_meta: ItemTransformFunctionWithMeta[TDataItem] + _f: ItemTransformFunctionNoMeta[TDataItem] + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + if isinstance(item, list): + for i in item: + if self._f_meta: + yield from self._f_meta(i, meta) + else: + yield from self._f(i) + else: + if self._f_meta: + yield from self._f_meta(item, meta) + else: + yield from self._f(item) + + +class ValidateItem(ItemTransform[TDataItem]): + """Base class for validators of data items. + + Subclass should implement the `__call__` method to either return the data item(s) or raise `extract.exceptions.ValidationError`. + See `PydanticValidator` for possible implementation. + """ + + placement_affinity: ClassVar[float] = 0.9 # stick to end but less than incremental + + table_name: str + + def bind(self, pipe: SupportsPipe) -> ItemTransform[TDataItem]: + self.table_name = pipe.name + return self + + +class LimitItem(ItemTransform[TDataItem]): + placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental + + def __init__(self, max_items: Optional[int], max_time: Optional[float]) -> None: + self.max_items = max_items if max_items is not None else -1 + self.max_time = max_time + + def bind(self, pipe: SupportsPipe) -> "LimitItem": + # we also wrap iterators to make them stoppable + if isinstance(pipe.gen, Iterator): + pipe.replace_gen(wrap_iterator(pipe.gen)) + + self.gen = pipe.gen + self.count = 0 + self.exhausted = False + self.start_time = time.time() + + return self + + def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + self.count += 1 + + # detect when the limit is reached, max time or yield count + if ( + (self.count == self.max_items) + or (self.max_time and time.time() - self.start_time > self.max_time) + or self.max_items == 0 + ): + self.exhausted = True + if inspect.isgenerator(self.gen): + self.gen.close() + + # if max items is not 0, we return the last item + # otherwise never return anything + if self.max_items != 0: + return item + + # do not return any late arriving items + if self.exhausted: + return None + + return item diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index b95c9a6c40..e70365b4f4 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -27,12 +27,12 @@ UnclosablePipe, ) from dlt.extract.items import ( - ItemTransform, ResolvablePipeItem, SupportsPipe, TPipeStep, TPipedDataItems, ) +from dlt.extract.items_transform import ItemTransform from dlt.extract.utils import ( check_compat_transformer, simulate_func_call, diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 2f4f66a96e..366e6e1a88 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -34,14 +34,16 @@ from dlt.extract.items import ( DataItemWithMeta, - ItemTransformFunc, - ItemTransformFunctionWithMeta, TableNameMeta, +) +from dlt.extract.items_transform import ( FilterItem, MapItem, YieldMapItem, ValidateItem, LimitItem, + ItemTransformFunc, + ItemTransformFunctionWithMeta, ) from dlt.extract.pipe_iterator import ManagedPipeIterator from dlt.extract.pipe import Pipe, TPipeStep diff --git a/dlt/extract/validation.py b/dlt/extract/validation.py index 4cd321b88c..d9fe70a90b 100644 --- a/dlt/extract/validation.py +++ b/dlt/extract/validation.py @@ -8,7 +8,8 @@ from dlt.common.typing import TDataItems from dlt.common.schema.typing import TAnySchemaColumns, TSchemaContract, TSchemaEvolutionMode -from dlt.extract.items import TTableHintTemplate, ValidateItem +from dlt.extract.items import TTableHintTemplate +from dlt.extract.items_transform import ValidateItem _TPydanticModel = TypeVar("_TPydanticModel", bound=PydanticBaseModel) diff --git a/dlt/sources/helpers/transform.py b/dlt/sources/helpers/transform.py index 32843e2aa2..45738fe4fb 100644 --- a/dlt/sources/helpers/transform.py +++ b/dlt/sources/helpers/transform.py @@ -2,7 +2,7 @@ from typing import Any, Dict, Sequence, Union from dlt.common.typing import TDataItem -from dlt.extract.items import ItemTransformFunctionNoMeta +from dlt.extract.items_transform import ItemTransformFunctionNoMeta import jsonpath_ng diff --git a/tests/extract/test_extract_pipe.py b/tests/extract/test_extract_pipe.py index d40639a594..659888269a 100644 --- a/tests/extract/test_extract_pipe.py +++ b/tests/extract/test_extract_pipe.py @@ -10,7 +10,8 @@ from dlt.common import sleep from dlt.common.typing import TDataItems from dlt.extract.exceptions import CreatePipeException, ResourceExtractionError, UnclosablePipe -from dlt.extract.items import DataItemWithMeta, FilterItem, MapItem, YieldMapItem +from dlt.extract.items import DataItemWithMeta +from dlt.extract.items_transform import FilterItem, MapItem, YieldMapItem from dlt.extract.pipe import Pipe from dlt.extract.pipe_iterator import PipeIterator, ManagedPipeIterator, PipeItem diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index 205df41a81..9ad7d28e88 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -36,7 +36,7 @@ IncrementalPrimaryKeyMissing, ) from dlt.extract.incremental.lag import apply_lag -from dlt.extract.items import ValidateItem +from dlt.extract.items_transform import ValidateItem from dlt.extract.resource import DltResource from dlt.pipeline.exceptions import PipelineStepFailed from dlt.sources.helpers.transform import take_first @@ -3989,7 +3989,7 @@ def resource( resource.add_limit(10) - p = dlt.pipeline(pipeline_name="incremtal_limit", destination="duckdb", dev_mode=True) + p = dlt.pipeline(pipeline_name="incremental_limit", destination="duckdb", dev_mode=True) p.run(resource()) diff --git a/tests/extract/test_validation.py b/tests/extract/test_validation.py index 138589bb06..3800f333f6 100644 --- a/tests/extract/test_validation.py +++ b/tests/extract/test_validation.py @@ -10,7 +10,7 @@ from dlt.common.libs.pydantic import BaseModel from dlt.extract import DltResource -from dlt.extract.items import ValidateItem +from dlt.extract.items_transform import ValidateItem from dlt.extract.validation import PydanticValidator from dlt.extract.exceptions import ResourceExtractionError from dlt.pipeline.exceptions import PipelineStepFailed diff --git a/tests/extract/utils.py b/tests/extract/utils.py index 7364ef7243..f1de3de093 100644 --- a/tests/extract/utils.py +++ b/tests/extract/utils.py @@ -6,7 +6,7 @@ from dlt.common.typing import TDataItem, TDataItems from dlt.extract.extract import ExtractStorage -from dlt.extract.items import ItemTransform +from dlt.extract.items_transform import ItemTransform from tests.utils import TestDataItemFormat