Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build(ingest): remove ratelimiter dependency #9008

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
"progressbar2",
"termcolor>=1.0.0",
"psutil>=5.8.0",
"ratelimiter",
"Deprecated",
"humanfriendly",
"packaging",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from google.cloud import bigquery
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from ratelimiter import RateLimiter

from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
AuditLogEntry,
Expand All @@ -17,6 +16,7 @@
BQ_DATE_SHARD_FORMAT,
BQ_DATETIME_FORMAT,
)
from datahub.utilities.ratelimiter import RateLimiter

logger: logging.Logger = logging.getLogger(__name__)

Expand Down
56 changes: 56 additions & 0 deletions metadata-ingestion/src/datahub/utilities/ratelimiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import collections
import threading
import time
from contextlib import AbstractContextManager
from typing import Any, Deque


# Modified version of https://github.com/RazerM/ratelimiter/blob/master/ratelimiter/_sync.py
class RateLimiter(AbstractContextManager):

"""Provides rate limiting for an operation with a configurable number of
requests for a time period.
"""

def __init__(self, max_calls: int, period: float = 1.0) -> None:
"""Initialize a RateLimiter object which enforces as much as max_calls
operations on period (eventually floating) number of seconds.
"""
if period <= 0:
raise ValueError("Rate limiting period should be > 0")
if max_calls <= 0:
raise ValueError("Rate limiting number of calls should be > 0")

# We're using a deque to store the last execution timestamps, not for
# its maxlen attribute, but to allow constant time front removal.
self.calls: Deque = collections.deque()

self.period = period
self.max_calls = max_calls
self._lock = threading.Lock()

def __enter__(self) -> "RateLimiter":
with self._lock:
# We want to ensure that no more than max_calls were run in the allowed
# period. For this, we store the last timestamps of each call and run
# the rate verification upon each __enter__ call.
if len(self.calls) >= self.max_calls:
until = time.time() + self.period - self._timespan
sleeptime = until - time.time()
if sleeptime > 0:
time.sleep(sleeptime)
return self

def __exit__(self, exc_type: Any, exc: Any, traceback: Any) -> None:
with self._lock:
# Store the last operation timestamp.
self.calls.append(time.time())

# Pop the timestamp list front (ie: the older calls) until the sum goes
# back below the period. This is our 'sliding period' window.
while self._timespan >= self.period:
self.calls.popleft()

@property
def _timespan(self) -> float:
return self.calls[-1] - self.calls[0]
20 changes: 20 additions & 0 deletions metadata-ingestion/tests/unit/utilities/test_ratelimiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from collections import defaultdict
from datetime import datetime
from typing import Dict

from datahub.utilities.ratelimiter import RateLimiter


def test_rate_is_limited():
MAX_CALLS_PER_SEC = 5
TOTAL_CALLS = 18
actual_calls: Dict[float, int] = defaultdict(lambda: 0)

ratelimiter = RateLimiter(max_calls=MAX_CALLS_PER_SEC, period=1)
for _ in range(TOTAL_CALLS):
with ratelimiter:
actual_calls[datetime.now().replace(microsecond=0).timestamp()] += 1

assert len(actual_calls) == round(TOTAL_CALLS / MAX_CALLS_PER_SEC)
assert all(calls <= MAX_CALLS_PER_SEC for calls in actual_calls.values())
assert sum(actual_calls.values()) == TOTAL_CALLS