Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple source rate limiting #2149

Open
wants to merge 5 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions dlt/extract/pipe_iterator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import inspect
import types
import time
from typing import (
AsyncIterator,
ClassVar,
Dict,
Optional,
Sequence,
Union,
Iterator,
Expand Down Expand Up @@ -52,6 +54,8 @@ class PipeIteratorConfiguration(BaseConfiguration):
futures_poll_interval: float = 0.01
copy_on_fork: bool = False
next_item_mode: str = "round_robin"
rate_limit: Optional[float] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be a callback? Or even better a generic RateLimiter instance for example that can be shared between different pipes and dynamically updated?

I am thinking about #1485 (comment)

which describes an API that defines a global rate limit across all api endpoints.

If I have different resources (real-world example: https://github.com/dlt-hub/verified-sources/pull/587/files#diff-0fc4db143e89ab087ef737341bc4d93a631141a0bb633c31831ca22bb072c146R100-R105) that are in different pipes but share one limit, then I can't easily express this with this API as I wouldn't know how many other requests are made in other pipes (you can see from the code above that the pipes even may depend on the user configuration).
It also wouldn't be possible to make the rate limit dynamic. E.g. let's say I start a pipeline run that takes 2h but during that time an external system uses a certain amount of the resource for a while, then I'd like to adjust the rate limiting here. I.e.:

t0: I start pipe 1, I have 900 requests/s
t1: another system starts working, uses 100 requests/s, I want to reduce pipe 1 to 800r/s
t2: the other system stops it's work, I want to up the requests of pipe 1 to 900 aain

or

t0: I start pipe1 and pipe2 parallel, I have 900 req/s in total, so they share 450req/s each
t1: pipe2 is much faster and finishes, I want to increase the req/s for pipe1 to 900

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joscha could you maybe collect your requirements for rate limiting in a ticket. I'm not sure if this change here will make it into the code soon, but it is meant as fairly simple rate limiting mechanism to address requirements we hear a lot from the community. The idea to use exponential backoff and to respect well defined rate limiting header also is under consideration and would go into the rest api layer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can most certainly put it in a ticket. I am also not suggesting that having rate liming here is bad, I am just not sure the current implementation is flexible enough for more complex systems. Having one way to define rate limits and exponential backoff would be amazing, as it then would mean we only have one way to define it and next time we add the feature to another layer (like the RESTClient for example) we can just reuse what's already known.

Thinking in terms of how sources and/or pipes are structured and limiting them does also not always easily translate to the underlying system these source(s)/resource(s) pull data from. For resources based on an external REST API for example we possibly wouldn't be interested in limiting the work in the resource, but only how many requests are leaving the system (possibly even based on the endpoint as often APIs define different limits for different endpoints but a dlt resource might be using more than one type)?

"""Rate limit in max items per second"""
__section__: ClassVar[str] = known_sections.EXTRACT

def __init__(
Expand All @@ -61,6 +65,7 @@ def __init__(
futures_poll_interval: float,
sources: List[SourcePipeItem],
next_item_mode: TPipeNextItemMode,
rate_limit: float,
) -> None:
self._sources = sources
self._next_item_mode: TPipeNextItemMode = next_item_mode
Expand All @@ -71,6 +76,8 @@ def __init__(
poll_interval=futures_poll_interval,
max_parallel_items=max_parallel_items,
)
self._rate_limit = rate_limit
self._last_source_item_time = 0.0

@classmethod
@with_config(spec=PipeIteratorConfiguration)
Expand All @@ -82,6 +89,7 @@ def from_pipe(
workers: int = 5,
futures_poll_interval: float = 0.01,
next_item_mode: TPipeNextItemMode = "round_robin",
rate_limit: float = None,
) -> "PipeIterator":
# join all dependent pipes
if pipe.parent:
Expand All @@ -95,7 +103,9 @@ def from_pipe(

# create extractor
sources = [SourcePipeItem(pipe.gen, 0, pipe, None)]
return cls(max_parallel_items, workers, futures_poll_interval, sources, next_item_mode)
return cls(
max_parallel_items, workers, futures_poll_interval, sources, next_item_mode, rate_limit
)

@classmethod
@with_config(spec=PipeIteratorConfiguration)
Expand All @@ -109,6 +119,7 @@ def from_pipes(
futures_poll_interval: float = 0.01,
copy_on_fork: bool = False,
next_item_mode: TPipeNextItemMode = "round_robin",
rate_limit: float = None,
) -> "PipeIterator":
# print(f"max_parallel_items: {max_parallel_items} workers: {workers}")
sources: List[SourcePipeItem] = []
Expand Down Expand Up @@ -141,7 +152,9 @@ def _fork_pipeline(pipe: Pipe) -> None:
_fork_pipeline(pipe)

# create extractor
return cls(max_parallel_items, workers, futures_poll_interval, sources, next_item_mode)
return cls(
max_parallel_items, workers, futures_poll_interval, sources, next_item_mode, rate_limit
)

def __next__(self) -> PipeItem:
pipe_item: Union[ResolvablePipeItem, SourcePipeItem] = None
Expand Down Expand Up @@ -253,6 +266,13 @@ def _get_source_item(self) -> ResolvablePipeItem:
# no more sources to iterate
if sources_count == 0:
return None
# if rate limited, sleep a very small amount of time and go back to evaluating futures
if self._rate_limit is not None:
min_time_between_items = 1 / self._rate_limit
time_since_last_source_item = time.time() - self._last_source_item_time
if time_since_last_source_item < min_time_between_items:
time.sleep(0.01)
return None
try:
first_evaluated_index: int = None
# always reset to end of list for fifo mode, also take into account that new sources can be added
Expand All @@ -272,19 +292,24 @@ def _get_source_item(self) -> ResolvablePipeItem:
set_current_pipe_name(pipe.name)

pipe_item = next(gen)
result: Optional[ResolvablePipeItem] = None
if pipe_item is not None:
# full pipe item may be returned, this is used by ForkPipe step
# to redirect execution of an item to another pipe
# else
if not isinstance(pipe_item, ResolvablePipeItem):
# keep the item assigned step and pipe when creating resolvable item
if isinstance(pipe_item, DataItemWithMeta):
return ResolvablePipeItem(pipe_item.data, step, pipe, pipe_item.meta)
result = ResolvablePipeItem(pipe_item.data, step, pipe, pipe_item.meta)
else:
return ResolvablePipeItem(pipe_item, step, pipe, meta)
result = ResolvablePipeItem(pipe_item, step, pipe, meta)

if pipe_item is not None:
return pipe_item
result = result or pipe_item

if result is not None:
if self._rate_limit:
self._last_source_item_time = time.time()
return result

# remember the first evaluated index
if first_evaluated_index is None:
Expand Down
13 changes: 12 additions & 1 deletion docs/website/docs/general-usage/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ Note that `add_limit` **does not limit the number of records** but rather the "n

Find more on sampling data [here](resource.md#sample-from-large-data).

### Rate limiting

You can limit the rate of data extraction by setting the `rate_limit` setting on the extract configuration. This is useful when you are extracting data from a rate limited API or database for example:

```toml
[extract]
# this will limit the rate to 10 resource iteration per second
rate_limit = 10
```

The rate limit applies to all resources across a given source. So setting a rate limit of 10 will lead to a maximum of 10 calls per second to an api, even if you are running 3 resources in parallel.

### Rename the source
`dlt` allows you to rename the source ie. to place the source configuration into custom section or to have many instances
of the source created side by side. For example:
Expand All @@ -128,7 +140,6 @@ credentials from:
[sources.my_db.my_db.credentials]
password="..."
```

### Add more resources to existing source

You can add a custom resource to a source after it was created. Imagine that you want to score all the deals with a keras model that will tell you if the deal is a fraud or not. In order to do that, you declare a new [transformer that takes the data from](resource.md#feeding-data-from-one-resource-into-another) `deals` resource and add it to the source.
Expand Down
55 changes: 55 additions & 0 deletions tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest
import asyncio
import time

import dlt, os
from dlt.common.configuration.container import Container
Expand Down Expand Up @@ -895,6 +896,60 @@ def test_source(expected_state):
assert state.state == {"sources": {"test_source": {"value": 1}}}


@pytest.mark.parametrize("rate_limit", (None, 10, 100))
def test_source_rate_limit(rate_limit: int) -> None:
if rate_limit is not None:
os.environ["EXTRACT__RATE_LIMIT"] = str(rate_limit)
else:
try:
del os.environ["EXTRACT__RATE_LIMIT"]
except KeyError:
pass

@dlt.resource
def rate_limited_resource():
yield from range(10)

start_time = time.time()
r = rate_limited_resource().add_limit(10)
assert list(r) == list(range(10))
elapsed = time.time() - start_time

if rate_limit is None:
assert elapsed < 0.2
elif rate_limit == 10:
assert (elapsed > 1.0) and (elapsed < 3.0)
elif rate_limit == 100:
assert (elapsed > 0.1) and (elapsed < 1.0)

# run another check with a source and transformer
# here we check that rate limit also works with multiple resources and on transformers
# we have 6 * 3 items all together plus additional 3 * 3 items from the resource
# that is piped into the transformer with makes 9 * 3 = 27 rate limited calls
@dlt.source
def infinite_source():
for idx in range(3):
r = dlt.resource(itertools.count(), name=f"infinity_{idx}").add_limit(3)
yield r
yield r | dlt.transformer(name=f"mul_c_{idx}")(lambda i: i * 2)

start_time = time.time()
assert list(infinite_source()) == [0, 0, 2, 1, 4, 2] * 3
elapsed = time.time() - start_time

if rate_limit is None:
assert elapsed < 0.2
elif rate_limit == 10:
assert (elapsed > 2.5) and (elapsed < 5.0)
elif rate_limit == 100:
assert (elapsed > 0.2) and (elapsed < 1.0)

try:
del os.environ["EXTRACT__RATE_LIMIT"]
except KeyError:
pass


def test_resource_state() -> None:
@dlt.resource
def test_resource():
Expand Down
Loading