From ad2e1074b36f190d1f5d583190b76770a3612405 Mon Sep 17 00:00:00 2001 From: Abhinav Singh <126065+abhinavsingh@users.noreply.github.com> Date: Fri, 22 Nov 2024 10:07:27 +0530 Subject: [PATCH] Leakage class (#1510) * Leakage class * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix lint issues --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- proxy/common/leakage.py | 52 ++++++++++++++++++++++++++++++ tests/common/test_leakage.py | 61 ++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 proxy/common/leakage.py create mode 100644 tests/common/test_leakage.py diff --git a/proxy/common/leakage.py b/proxy/common/leakage.py new file mode 100644 index 0000000000..914b76f375 --- /dev/null +++ b/proxy/common/leakage.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +""" + proxy.py + ~~~~~~~~ + ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on + Network monitoring, controls & Application development, testing, debugging. + + :copyright: (c) 2013-present by Abhinav Singh and contributors. + :license: BSD, see LICENSE for more details. +""" +import time + + +class Leakage: + """Leaky Bucket algorithm.""" + + def __init__(self, rate: int) -> None: + """Initialize the leaky bucket with a specified leak rate in bytes per second.""" + # Maximum number of tokens the bucket can hold (bytes per second) + self.rate = rate + self.tokens = rate + self.last_check = time.time() + + def _refill(self) -> None: + """Refill tokens based on the elapsed time since the last check.""" + now = time.time() + elapsed = now - self.last_check + # Add tokens proportional to elapsed time, up to the rate + self.tokens += int(elapsed * self.rate) + # Cap tokens at the maximum rate to enforce the rate limit + self.tokens = min(self.tokens, self.rate) + self.last_check = now + + def release(self, tokens: int) -> None: + """When you are unable to consume amount units of token, release them into the bucket. + + E.g. say you wanted to read 1024 units, but only 24 units were read, then put + back unconsumed 1000 tokens back in the bucket.""" + if tokens < 0: + raise ValueError('Cannot release a negative number of tokens') + self.tokens += tokens + self.tokens = min(self.tokens, self.rate) + + def consume(self, amount: int) -> int: + """Attempt to consume the amount from the bucket. + + Returns the amount allowed to be sent, up to the available tokens (rate). + """ + self._refill() + allowed = min(amount, self.tokens) + self.tokens -= allowed + return allowed diff --git a/tests/common/test_leakage.py b/tests/common/test_leakage.py new file mode 100644 index 0000000000..564139cfac --- /dev/null +++ b/tests/common/test_leakage.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +""" + proxy.py + ~~~~~~~~ + ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on + Network monitoring, controls & Application development, testing, debugging. + + :copyright: (c) 2013-present by Abhinav Singh and contributors. + :license: BSD, see LICENSE for more details. +""" +import time + +import unittest + +from proxy.common.leakage import Leakage + + +class TestLeakage(unittest.TestCase): + + def test_initial_consume_no_tokens(self) -> None: + # Test consuming with no tokens available initially + rate = 100 # bytes per second + bucket = Leakage(rate) + self.assertEqual( + bucket.consume(150), + 100, + ) # No tokens yet, so expect 0 bytes to be sent + + def test_consume_with_refill(self) -> None: + # Test consuming with refill after waiting + rate = 100 # bytes per second + bucket = Leakage(rate) + time.sleep(1) # Wait for a second to allow refill + self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available + + def test_consume_above_leak_rate(self) -> None: + # Test attempting to consume more than the leak rate after a refill + rate = 100 # bytes per second + bucket = Leakage(rate) + time.sleep(1) # Wait for a second to allow refill + self.assertEqual(bucket.consume(150), 100) # Only 100 bytes should be allowed + + def test_repeated_consume_with_partial_refill(self) -> None: + # Test repeated consumption with partial refill + rate = 100 # bytes per second + bucket = Leakage(rate) + + time.sleep(1) # Allow tokens to accumulate + bucket.consume(80) # Consume 80 bytes, should leave 20 + time.sleep(0.5) # Wait half a second to refill by 50 bytes + + self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available now + + def test_negative_token_guard(self) -> None: + # Ensure tokens do not go negative + rate = 100 # bytes per second + bucket = Leakage(rate) + time.sleep(1) # Allow tokens to accumulate + bucket.consume(150) # Consume all available tokens + self.assertEqual(bucket.consume(10), 0) # Should return 0 as no tokens are left + self.assertEqual(bucket.tokens, 0) # Tokens should not be negative