Skip to content

Commit

Permalink
prevent late arriving items to be forwarded from limit
Browse files Browse the repository at this point in the history
add some convenience methods for pipe step management
  • Loading branch information
sh-rp committed Dec 11, 2024
1 parent f7f216e commit c7ddf9b
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 20 deletions.
5 changes: 5 additions & 0 deletions dlt/extract/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,17 @@ def __init__(self, max_items: int) -> None:
def bind(self, pipe: SupportsPipe) -> "LimitItem":
self.gen = pipe.gen
self.count = 0
self.exceeded = False
return self

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
# detect when the limit is reached
if self.count == self.max_items:
self.exhausted = True
if inspect.isgenerator(self.gen):
self.gen.close()
# do not return any late arriving items
if self.exhausted:
return None
self.count += 1
return item
18 changes: 17 additions & 1 deletion dlt/extract/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,23 @@ def steps(self) -> List[TPipeStep]:

def find(self, *step_type: AnyType) -> int:
"""Finds a step with object of type `step_type`"""
return next((i for i, v in enumerate(self._steps) if isinstance(v, step_type)), -1)
found = self.find_all(step_type)
return found[0] if found else -1

def find_all(self, *step_type: AnyType) -> List[int]:
"""Finds all steps with object of type `step_type`"""
return [i for i, v in enumerate(self._steps) if isinstance(v, step_type)]

def get_by_type(self, *step_type: AnyType) -> TPipeStep:
"""Gets first step found with object of type `step_type`"""
return next((v for v in self._steps if isinstance(v, step_type)), None)

def remove_by_type(self, *step_type: AnyType) -> int:
"""Deletes first step found with object of type `step_type`, returns previous index"""
step_index = self.find(*step_type)
if step_index >= 0:
self.remove_step(step_index)
return step_index

def __getitem__(self, i: int) -> TPipeStep:
return self._steps[i]
Expand Down
31 changes: 12 additions & 19 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from functools import partial
from typing import (
AsyncIterable,
AsyncIterator,
cast,
ClassVar,
Callable,
Iterable,
Expand Down Expand Up @@ -215,31 +215,24 @@ def requires_args(self) -> bool:
return True

@property
def incremental(self) -> IncrementalResourceWrapper:
def incremental(self) -> Optional[IncrementalResourceWrapper]:
"""Gets incremental transform if it is in the pipe"""
incremental: IncrementalResourceWrapper = None
step_no = self._pipe.find(IncrementalResourceWrapper, Incremental)
if step_no >= 0:
incremental = self._pipe.steps[step_no] # type: ignore
return incremental
return cast(
Optional[IncrementalResourceWrapper],
self._pipe.get_by_type(IncrementalResourceWrapper, Incremental),
)

@property
def validator(self) -> Optional[ValidateItem]:
"""Gets validator transform if it is in the pipe"""
validator: ValidateItem = None
step_no = self._pipe.find(ValidateItem)
if step_no >= 0:
validator = self._pipe.steps[step_no] # type: ignore[assignment]
return validator
return cast(Optional[ValidateItem], self._pipe.get_by_type(ValidateItem))

@validator.setter
def validator(self, validator: Optional[ValidateItem]) -> None:
"""Add/remove or replace the validator in pipe"""
step_no = self._pipe.find(ValidateItem)
if step_no >= 0:
self._pipe.remove_step(step_no)
self._pipe.remove_by_type(ValidateItem)
if validator:
self.add_step(validator, insert_at=step_no if step_no >= 0 else None)
self.add_step(validator)

@property
def max_table_nesting(self) -> Optional[int]:
Expand Down Expand Up @@ -370,6 +363,8 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no
" the top level resource."
)
else:
# remove existing limit if any
self._pipe.remove_by_type(LimitItem)
self.add_step(LimitItem(max_items))

return self
Expand Down Expand Up @@ -404,9 +399,7 @@ def add_step(
return self

def _remove_incremental_step(self) -> None:
step_no = self._pipe.find(Incremental, IncrementalResourceWrapper)
if step_no >= 0:
self._pipe.remove_step(step_no)
self._pipe.remove_by_type(Incremental, IncrementalResourceWrapper)

def set_incremental(
self,
Expand Down

0 comments on commit c7ddf9b

Please sign in to comment.