Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[experiment] Add resource time limit and rate limiting #1485

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions dlt/extract/resource.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
import time
from functools import partial
from typing import (
AsyncIterable,
Expand All @@ -11,6 +12,7 @@
Union,
Any,
Optional,
Literal,
)
from typing_extensions import TypeVar, Self

Expand Down Expand Up @@ -345,8 +347,14 @@ def add_filter(
self._pipe.insert_step(FilterItem(item_filter), insert_at)
return self

def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # noqa: A003
"""Adds a limit `max_items` to the resource pipe.
def add_limit(
self: TDltResourceImpl,
max_items: Optional[int] = None,
max_time: Optional[float] = None,
min_wait: Optional[float] = None,
) -> TDltResourceImpl: # noqa: A003
"""Adds a limit `max_items` to the resource pipe


This mutates the encapsulated generator to stop after `max_items` items are yielded. This is useful for testing and debugging.

Expand All @@ -356,7 +364,9 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no
3. Async resources with a limit added may occasionally produce one item more than the limit on some runs. This behavior is not deterministic.

Args:
max_items (int): The maximum number of items to yield
max_items (int): The maximum number of items to yield, set to None for no limit
max_time (float): The maximum number of seconds for this generator to run after it was opened, set to None for no limit
min_wait (float): The minimum number of seconds to wait between iterations (usedful for rate limiting)
Returns:
"DltResource": returns self
"""
Expand All @@ -368,10 +378,22 @@ def add_limit(self: TDltResourceImpl, max_items: int) -> TDltResourceImpl: # no
def _gen_wrap(gen: TPipeStep) -> TPipeStep:
"""Wrap a generator to take the first `max_items` records"""

if max_items >= 0 or max_time and not self.incremental:
from dlt.common import logger

logger.warning(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will need to be improved a bit, but I think this is a quite nice solution.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also: generally speaking it would be cool to have some wrapper around log messages to be able to test them, maybe mocking would be enough, not sure

f"You have added a max_items or max_time limit to resource {self.name}, but no"
" incremental was declared."
)

# zero items should produce empty generator
if max_items == 0:
return

# vars needed for max time and rate limiting
start_time: float = time.time()
last_iteration: float = start_time

count = 0
is_async_gen = False
if callable(gen):
Expand All @@ -384,14 +406,32 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep:

try:
for i in gen: # type: ignore # TODO: help me fix this later
# return item to caller
yield i

# evaluate stop conditions
if i is not None:
count += 1
# async gen yields awaitable so we must count one awaitable more
# so the previous one is evaluated and yielded.
# new awaitable will be cancelled
if count == max_items + int(is_async_gen):
return
# if we crossed the max time, we will stop
if max_time and time.time() - start_time > max_time:
return

# apply rate limiting
if min_wait:
while (last_iteration + min_wait) - time.time() > 0:
# we give control back to the pipe iterator
yield None
time.sleep(0.1)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could make this configurable, I am not sure wether it is needed though.


# remember last iteration time
if min_wait:
last_iteration = time.time()

finally:
if inspect.isgenerator(gen):
gen.close()
Expand Down
122 changes: 122 additions & 0 deletions tests/extract/test_limits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import dlt
import itertools
import pytest
import asyncio
import os
import time


@pytest.fixture(autouse=True)
def set_round_robin():
"""this can be removed after the round robin PR is merged to devel"""
os.environ["EXTRACT__NEXT_ITEM_MODE"] = "round_robin"
yield
del os.environ["EXTRACT__NEXT_ITEM_MODE"]


def test_item_limit_infinite_counter() -> None:
r = dlt.resource(itertools.count(), name="infinity").add_limit(10)
assert list(r) == list(range(10))


def test_item_limit_source() -> None:
os.environ["EXTRACT__NEXT_ITEM_MODE"] = "fifo"

def mul_c(item):
yield from "A" * (item + 2)

@dlt.source
def infinite_source():
for idx in range(3):
r = dlt.resource(itertools.count(), name=f"infinity_{idx}").add_limit(10)
yield r
yield r | dlt.transformer(name=f"mul_c_{idx}")(mul_c)

# transformer is not limited to 2 elements, infinite resource is, we have 3 resources
assert list(infinite_source().add_limit(2)) == ["A", "A", 0, "A", "A", "A", 1] * 3


@pytest.mark.parametrize("limit", (None, -1, 0, 10))
def test_item_limit_edge_cases(limit: int) -> None:
r = dlt.resource(range(20), name="infinity").add_limit(limit) # type: ignore

@dlt.resource()
async def r_async():
for i in range(20):
await asyncio.sleep(0.01)
yield i

sync_list = list(r)
async_list = list(r_async().add_limit(limit))

if limit == 10:
assert sync_list == list(range(10))
# we have edge cases where the async list will have one extra item
# possibly due to timing issues, maybe some other implementation problem
assert (async_list == list(range(10))) or (async_list == list(range(11)))
elif limit in [None, -1]:
assert sync_list == async_list == list(range(20))
elif limit == 0:
assert sync_list == async_list == []
else:
raise AssertionError(f"Unexpected limit: {limit}")


def test_time_limit() -> None:
@dlt.resource()
def r():
for i in range(100):
time.sleep(0.1)
yield i

@dlt.resource()
async def r_async():
for i in range(100):
await asyncio.sleep(0.1)
yield i

sync_list = list(r().add_limit(max_time=1))
async_list = list(r_async().add_limit(max_time=1))

# we should have extracted 10 items within 1 second, sleep is included in the resource
allowed_results = [
list(range(12)),
list(range(11)),
list(range(10)),
list(range(9)),
list(range(8)),
]
assert sync_list in allowed_results
assert async_list in allowed_results


def test_min_wait() -> None:
@dlt.resource()
def r():
for i in range(100):
yield i

@dlt.resource()
async def r_async():
for i in range(100):
yield i

sync_list = list(r().add_limit(max_time=1, min_wait=0.2))
async_list = list(r_async().add_limit(max_time=1, min_wait=0.2))

# we should have extracted about 5 items within 1 second, sleep is done via min_wait
allowed_results = [
list(range(3)),
list(range(4)),
list(range(5)),
list(range(6)),
list(range(7)),
]
assert sync_list in allowed_results
assert async_list in allowed_results


# TODO: Test behavior in pipe iterator with more than one resource with different extraction modes set. We want to see if an overall rate limiting can be achieved with fifo which
# will be useful for APIs. Also round robin should apply rate limiting individually and not get stuck on one iterator sleeping.

# it would also be nice to be able to test the logger warnings if no incremental is present
46 changes: 0 additions & 46 deletions tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,52 +830,6 @@ def test_add_transformer_right_pipe() -> None:
iter([1, 2, 3]) | dlt.resource(lambda i: i * 3, name="lambda")


def test_limit_infinite_counter() -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these three tests were moved to the new location where a couple more tests will be added specifically for the limits

r = dlt.resource(itertools.count(), name="infinity").add_limit(10)
assert list(r) == list(range(10))


@pytest.mark.parametrize("limit", (None, -1, 0, 10))
def test_limit_edge_cases(limit: int) -> None:
r = dlt.resource(range(20), name="infinity").add_limit(limit) # type: ignore

@dlt.resource()
async def r_async():
for i in range(20):
await asyncio.sleep(0.01)
yield i

sync_list = list(r)
async_list = list(r_async().add_limit(limit))

if limit == 10:
assert sync_list == list(range(10))
# we have edge cases where the async list will have one extra item
# possibly due to timing issues, maybe some other implementation problem
assert (async_list == list(range(10))) or (async_list == list(range(11)))
elif limit in [None, -1]:
assert sync_list == async_list == list(range(20))
elif limit == 0:
assert sync_list == async_list == []
else:
raise AssertionError(f"Unexpected limit: {limit}")


def test_limit_source() -> None:
def mul_c(item):
yield from "A" * (item + 2)

@dlt.source
def infinite_source():
for idx in range(3):
r = dlt.resource(itertools.count(), name=f"infinity_{idx}").add_limit(10)
yield r
yield r | dlt.transformer(name=f"mul_c_{idx}")(mul_c)

# transformer is not limited to 2 elements, infinite resource is, we have 3 resources
assert list(infinite_source().add_limit(2)) == ["A", "A", 0, "A", "A", "A", 1] * 3


def test_source_state() -> None:
@dlt.source
def test_source(expected_state):
Expand Down
Loading