From c7ddf9b48774219eadde305aca220bdfb8fdd375 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 11 Dec 2024 15:39:03 +0100 Subject: [PATCH] prevent late arriving items to be forwarded from limit add some convenience methods for pipe step management --- dlt/extract/items.py | 5 +++++ dlt/extract/pipe.py | 18 +++++++++++++++++- dlt/extract/resource.py | 31 ++++++++++++------------------- 3 files changed, 34 insertions(+), 20 deletions(-) diff --git a/dlt/extract/items.py b/dlt/extract/items.py index 21738abcef..893c33f475 100644 --- a/dlt/extract/items.py +++ b/dlt/extract/items.py @@ -249,12 +249,17 @@ def __init__(self, max_items: int) -> None: def bind(self, pipe: SupportsPipe) -> "LimitItem": self.gen = pipe.gen self.count = 0 + self.exceeded = False return self def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]: + # detect when the limit is reached if self.count == self.max_items: + self.exhausted = True if inspect.isgenerator(self.gen): self.gen.close() + # do not return any late arriving items + if self.exhausted: return None self.count += 1 return item diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index 0e24de9558..411df40c35 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -123,7 +123,23 @@ def steps(self) -> List[TPipeStep]: def find(self, *step_type: AnyType) -> int: """Finds a step with object of type `step_type`""" - return next((i for i, v in enumerate(self._steps) if isinstance(v, step_type)), -1) + found = self.find_all(step_type) + return found[0] if found else -1 + + def find_all(self, *step_type: AnyType) -> List[int]: + """Finds all steps with object of type `step_type`""" + return [i for i, v in enumerate(self._steps) if isinstance(v, step_type)] + + def get_by_type(self, *step_type: AnyType) -> TPipeStep: + """Gets first step found with object of type `step_type`""" + return next((v for v in self._steps if isinstance(v, step_type)), None) + + def remove_by_type(self, *step_type: AnyType) -> int: + """Deletes first step found with object of type `step_type`, returns previous index""" + step_index = self.find(*step_type) + if step_index >= 0: + self.remove_step(step_index) + return step_index def __getitem__(self, i: int) -> TPipeStep: return self._steps[i] diff --git a/dlt/extract/resource.py b/dlt/extract/resource.py index 689e0a91f8..6e47e77aa3 100644 --- a/dlt/extract/resource.py +++ b/dlt/extract/resource.py @@ -2,7 +2,7 @@ from functools import partial from typing import ( AsyncIterable, - AsyncIterator, + cast, ClassVar, Callable, Iterable, @@ -215,31 +215,24 @@ def requires_args(self) -> bool: return True @property - def incremental(self) -> IncrementalResourceWrapper: + def incremental(self) -> Optional[IncrementalResourceWrapper]: """Gets incremental transform if it is in the pipe""" - incremental: IncrementalResourceWrapper = None - step_no = self._pipe.find(IncrementalResourceWrapper, Incremental) - if step_no >= 0: - incremental = self._pipe.steps[step_no] # type: ignore - return incremental + return cast( + Optional[IncrementalResourceWrapper], + self._pipe.get_by_type(IncrementalResourceWrapper, Incremental), + ) @property def validator(self) -> Optional[ValidateItem]: """Gets validator transform if it is in the pipe""" - validator: ValidateItem = None - step_no = self._pipe.find(ValidateItem) - if step_no >= 0: - validator = self._pipe.steps[step_no] # type: ignore[assignment] - return validator + return cast(Optional[ValidateItem], self._pipe.get_by_type(ValidateItem)) @validator.setter def validator(self, validator: Optional[ValidateItem]) -> None: """Add/remove or replace the validator in pipe""" - step_no = self._pipe.find(ValidateItem) - if step_no >= 0: - self._pipe.remove_step(step_no) + self._pipe.remove_by_type(ValidateItem) if validator: - self.add_step(validator, insert_at=step_no if step_no >= 0 else None) + self.add_step(validator) @property def max_table_nesting(self) -> Optional[int]: @@ -370,6 +363,8 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no " the top level resource." ) else: + # remove existing limit if any + self._pipe.remove_by_type(LimitItem) self.add_step(LimitItem(max_items)) return self @@ -404,9 +399,7 @@ def add_step( return self def _remove_incremental_step(self) -> None: - step_no = self._pipe.find(Incremental, IncrementalResourceWrapper) - if step_no >= 0: - self._pipe.remove_step(step_no) + self._pipe.remove_by_type(Incremental, IncrementalResourceWrapper) def set_incremental( self,