Skip to content

Commit

Permalink
renames start_out_of_range and end_out_of_range to is_below_initial_v…
Browse files Browse the repository at this point in the history
…alue and is_above_end_value
  • Loading branch information
willi-mueller committed Jul 22, 2024
1 parent b84c6e3 commit 95ce2cb
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 32 deletions.
10 changes: 5 additions & 5 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ def __init__(
"""State dictionary cached on first access"""
super().__init__(lambda x: x) # TODO:

self.end_out_of_range: bool = False
self.is_above_end_value: bool = False
"""Becomes true on the first item that is out of range of `end_value`. I.e. when using `max` function this means a value that is equal or higher"""
self.start_out_of_range: bool = False
self.is_below_initial_value: bool = False
"""Becomes true on the first item that is out of range of `start_value`. I.e. when using `max` this is a value that is lower than `start_value`"""

self._transformers: Dict[str, IncrementalTransform] = {}
Expand Down Expand Up @@ -335,7 +335,7 @@ def last_value(self) -> Optional[TCursorValue]:
def _transform_item(
self, transformer: IncrementalTransform, row: TDataItem
) -> Optional[TDataItem]:
row, self.start_out_of_range, self.end_out_of_range = transformer(row)
row, self.is_below_initial_value, self.is_above_end_value = transformer(row)
# if we know that rows are ordered we can close the generator automatically
# mind that closing pipe will not immediately close processing. it only closes the
# generator so this page will be fully processed
Expand Down Expand Up @@ -450,9 +450,9 @@ def can_close(self) -> bool:
# ordered ascending, check if we cross upper bound
return (
self.row_order == "asc"
and self.end_out_of_range
and self.is_above_end_value
or self.row_order == "desc"
and self.start_out_of_range
and self.is_below_initial_value
)

def __str__(self) -> str:
Expand Down
14 changes: 7 additions & 7 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ def __call__(
elif primary_key is None:
unique_columns = tbl.schema.names

start_out_of_range = end_out_of_range = False
is_below_initial_value = is_above_end_value = False
if not tbl: # row is None or empty arrow table
return tbl, start_out_of_range, end_out_of_range
return tbl, is_below_initial_value, is_above_end_value

if self.last_value_func is max:
compute = pa.compute.max
Expand Down Expand Up @@ -319,13 +319,13 @@ def __call__(
tbl = tbl.filter(end_compare(tbl[cursor_path], end_value_scalar))
# Is max row value higher than end value?
# NOTE: pyarrow bool *always* evaluates to python True. `as_py()` is necessary
end_out_of_range = not end_compare(row_value_scalar, end_value_scalar).as_py()
is_above_end_value = not end_compare(row_value_scalar, end_value_scalar).as_py()

if self.start_value is not None:
start_value_scalar = to_arrow_scalar(self.start_value, cursor_data_type)
# Remove rows lower or equal than the last start value
keep_filter = last_value_compare(tbl[cursor_path], start_value_scalar)
start_out_of_range = bool(pa.compute.any(pa.compute.invert(keep_filter)).as_py())
is_below_initial_value = bool(pa.compute.any(pa.compute.invert(keep_filter)).as_py())
tbl = tbl.filter(keep_filter)
if not self.deduplication_disabled:
# Deduplicate after filtering old values
Expand Down Expand Up @@ -373,11 +373,11 @@ def __call__(
)

if len(tbl) == 0:
return None, start_out_of_range, end_out_of_range
return None, is_below_initial_value, is_above_end_value
try:
tbl = pyarrow.remove_columns(tbl, ["_dlt_index"])
except KeyError:
pass
if is_pandas:
return tbl.to_pandas(), start_out_of_range, end_out_of_range
return tbl, start_out_of_range, end_out_of_range
return tbl.to_pandas(), is_below_initial_value, is_above_end_value
return tbl, is_below_initial_value, is_above_end_value
2 changes: 1 addition & 1 deletion docs/examples/incremental_loading/incremental_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def ticket_events(
yield page
# stop loading when using end_value and end is reached.
# unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves
if timestamp.end_out_of_range:
if timestamp.is_above_end_value:
return

return ticket_events
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/qdrant_zendesk/qdrant_zendesk.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def tickets_data(

# stop loading when using end_value and end is reached.
# unfortunately, Zendesk API does not have the "end_time" parameter, so we stop iterating ourselves
if updated_at.end_out_of_range:
if updated_at.is_above_end_value:
return

return tickets_data
Expand Down
4 changes: 2 additions & 2 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ incremental and exit yield loop when true.
:::

:::tip
The `dlt.sources.incremental` instance provides `start_out_of_range` and `end_out_of_range`
The `dlt.sources.incremental` instance provides `is_below_initial_value` and `is_above_end_value`
attributes which are set when the resource yields an element with a higher/lower cursor value than the
initial or end values. If you do not want `dlt` to stop processing automatically and instead to handle such events yourself, do not specify `row_order`:
```py
Expand All @@ -681,7 +681,7 @@ def tickets(
):
yield page
# Stop loading when we reach the end value
if updated_at.end_out_of_range:
if updated_at.is_above_end_value:
return

```
Expand Down
5 changes: 2 additions & 3 deletions docs/website/docs/getting-started-snippets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from tests.pipeline.utils import assert_load_info


Expand Down Expand Up @@ -159,7 +158,7 @@ def get_issues(

# stop requesting pages if the last element was already older than initial value
# note: incremental will skip those items anyway, we just do not want to use the api limits
if created_at.start_out_of_range:
if created_at.is_below_initial_value:
break

# get next page
Expand Down Expand Up @@ -241,7 +240,7 @@ def repo_events(last_created_at=dlt.sources.incremental("created_at")):

# stop requesting pages if the last element was already older than initial value
# note: incremental will skip those items anyway, we just do not want to use the api limits
if last_created_at.start_out_of_range:
if last_created_at.is_below_initial_value:
break

# get next page
Expand Down
5 changes: 2 additions & 3 deletions docs/website/docs/tutorial/load-data-from-an-api-snippets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from tests.pipeline.utils import assert_load_info


Expand Down Expand Up @@ -71,7 +70,7 @@ def get_issues(
# older than initial value
# Note: incremental will skip those items anyway, we just
# do not want to use the api limits
if created_at.start_out_of_range:
if created_at.is_below_initial_value:
break

# get next page
Expand Down Expand Up @@ -160,7 +159,7 @@ def repo_events(last_created_at=dlt.sources.incremental("created_at")):

# stop requesting pages if the last element was already older than initial value
# note: incremental will skip those items anyway, we just do not want to use the api limits
if last_created_at.start_out_of_range:
if last_created_at.is_below_initial_value:
break

# get next page
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def repo_events(last_created_at=dlt.sources.incremental("created_at")):
# the initial value
# note: incremental will skip those items anyway, we just do not
# want to use the api limits
if last_created_at.start_out_of_range:
if last_created_at.is_below_initial_value:
break

# get next page
Expand Down
18 changes: 9 additions & 9 deletions tests/extract/test_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -1692,7 +1692,7 @@ def custom_last_value(items):

@pytest.mark.parametrize("item_type", ALL_TEST_DATA_ITEM_FORMATS)
def test_out_of_range_flags(item_type: TestDataItemFormat) -> None:
"""Test incremental.start_out_of_range / end_out_of_range flags are set when items are filtered out"""
"""Test incremental.is_below_initial_value / is_above_end_value flags are set when items are filtered out"""

@dlt.resource
def descending(
Expand All @@ -1705,9 +1705,9 @@ def descending(
yield data_to_item_format(item_type, data)
# Assert flag is set only on the first item < initial_value
if all(item > 9 for item in chunk):
assert updated_at.start_out_of_range is False
assert updated_at.is_below_initial_value is False
else:
assert updated_at.start_out_of_range is True
assert updated_at.is_below_initial_value is True
return

@dlt.resource
Expand All @@ -1721,9 +1721,9 @@ def ascending(
yield data_to_item_format(item_type, data)
# Flag is set only when end_value is reached
if all(item < 45 for item in chunk):
assert updated_at.end_out_of_range is False
assert updated_at.is_above_end_value is False
else:
assert updated_at.end_out_of_range is True
assert updated_at.is_above_end_value is True
return

@dlt.resource
Expand All @@ -1736,9 +1736,9 @@ def descending_single_item(
data = [{"updated_at": i}]
yield from data_to_item_format(item_type, data)
if i >= 10:
assert updated_at.start_out_of_range is False
assert updated_at.is_below_initial_value is False
else:
assert updated_at.start_out_of_range is True
assert updated_at.is_below_initial_value is True
return

@dlt.resource
Expand All @@ -1751,9 +1751,9 @@ def ascending_single_item(
data = [{"updated_at": i}]
yield from data_to_item_format(item_type, data)
if i < 22:
assert updated_at.end_out_of_range is False
assert updated_at.is_above_end_value is False
else:
assert updated_at.end_out_of_range is True
assert updated_at.is_above_end_value is True
return

pipeline = dlt.pipeline(pipeline_name="incremental_" + uniq_id(), destination="duckdb")
Expand Down

0 comments on commit 95ce2cb

Please sign in to comment.