From afaa5dca10da45cf49f3048e337ca20f5662a5b6 Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Fri, 13 Oct 2023 19:08:26 +0530 Subject: [PATCH 1/2] build(ingest): remove ratelimiter dependency --- metadata-ingestion/setup.py | 1 - .../bigquery_v2/bigquery_audit_log_api.py | 2 +- .../src/datahub/utilities/ratelimiter.py | 56 +++++++++++++++++++ 3 files changed, 57 insertions(+), 2 deletions(-) create mode 100644 metadata-ingestion/src/datahub/utilities/ratelimiter.py diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 3ea9a2ea61d740..2659f43b22c633 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -38,7 +38,6 @@ "progressbar2", "termcolor>=1.0.0", "psutil>=5.8.0", - "ratelimiter", "Deprecated", "humanfriendly", "packaging", diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit_log_api.py index 03b12c61ee5c6c..db552c09cd0a73 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit_log_api.py @@ -4,7 +4,6 @@ from google.cloud import bigquery from google.cloud.logging_v2.client import Client as GCPLoggingClient -from ratelimiter import RateLimiter from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( AuditLogEntry, @@ -17,6 +16,7 @@ BQ_DATE_SHARD_FORMAT, BQ_DATETIME_FORMAT, ) +from datahub.utilities.ratelimiter import RateLimiter logger: logging.Logger = logging.getLogger(__name__) diff --git a/metadata-ingestion/src/datahub/utilities/ratelimiter.py b/metadata-ingestion/src/datahub/utilities/ratelimiter.py new file mode 100644 index 00000000000000..3d47d25e14c492 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/ratelimiter.py @@ -0,0 +1,56 @@ +import collections +import threading +import time +from contextlib import AbstractContextManager +from typing import Any, Deque + + +# Modified version of https://github.com/RazerM/ratelimiter/blob/master/ratelimiter/_sync.py +class RateLimiter(AbstractContextManager): + + """Provides rate limiting for an operation with a configurable number of + requests for a time period. + """ + + def __init__(self, max_calls: int, period: float = 1.0) -> None: + """Initialize a RateLimiter object which enforces as much as max_calls + operations on period (eventually floating) number of seconds. + """ + if period <= 0: + raise ValueError("Rate limiting period should be > 0") + if max_calls <= 0: + raise ValueError("Rate limiting number of calls should be > 0") + + # We're using a deque to store the last execution timestamps, not for + # its maxlen attribute, but to allow constant time front removal. + self.calls: Deque = collections.deque() + + self.period = period + self.max_calls = max_calls + self._lock = threading.Lock() + + def __enter__(self) -> "RateLimiter": + with self._lock: + # We want to ensure that no more than max_calls were run in the allowed + # period. For this, we store the last timestamps of each call and run + # the rate verification upon each __enter__ call. + if len(self.calls) >= self.max_calls: + until = time.time() + self.period - self._timespan + sleeptime = until - time.time() + if sleeptime > 0: + time.sleep(sleeptime) + return self + + def __exit__(self, exc_type: Any, exc: Any, traceback: Any) -> None: + with self._lock: + # Store the last operation timestamp. + self.calls.append(time.time()) + + # Pop the timestamp list front (ie: the older calls) until the sum goes + # back below the period. This is our 'sliding period' window. + while self._timespan >= self.period: + self.calls.popleft() + + @property + def _timespan(self) -> float: + return self.calls[-1] - self.calls[0] From bbb3eba32a292c843a292f81864f27babaf9a0d3 Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Mon, 16 Oct 2023 16:08:09 +0530 Subject: [PATCH 2/2] add unit test --- .../tests/unit/utilities/test_ratelimiter.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 metadata-ingestion/tests/unit/utilities/test_ratelimiter.py diff --git a/metadata-ingestion/tests/unit/utilities/test_ratelimiter.py b/metadata-ingestion/tests/unit/utilities/test_ratelimiter.py new file mode 100644 index 00000000000000..0384e1f9188812 --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_ratelimiter.py @@ -0,0 +1,20 @@ +from collections import defaultdict +from datetime import datetime +from typing import Dict + +from datahub.utilities.ratelimiter import RateLimiter + + +def test_rate_is_limited(): + MAX_CALLS_PER_SEC = 5 + TOTAL_CALLS = 18 + actual_calls: Dict[float, int] = defaultdict(lambda: 0) + + ratelimiter = RateLimiter(max_calls=MAX_CALLS_PER_SEC, period=1) + for _ in range(TOTAL_CALLS): + with ratelimiter: + actual_calls[datetime.now().replace(microsecond=0).timestamp()] += 1 + + assert len(actual_calls) == round(TOTAL_CALLS / MAX_CALLS_PER_SEC) + assert all(calls <= MAX_CALLS_PER_SEC for calls in actual_calls.values()) + assert sum(actual_calls.values()) == TOTAL_CALLS