Skip to content

Commit

Permalink
Leakage class (#1510)
Browse files Browse the repository at this point in the history
* Leakage class

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fix lint issues

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
abhinavsingh and pre-commit-ci[bot] authored Nov 22, 2024
1 parent 71d796b commit ad2e107
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
52 changes: 52 additions & 0 deletions proxy/common/leakage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import time


class Leakage:
"""Leaky Bucket algorithm."""

def __init__(self, rate: int) -> None:
"""Initialize the leaky bucket with a specified leak rate in bytes per second."""
# Maximum number of tokens the bucket can hold (bytes per second)
self.rate = rate
self.tokens = rate
self.last_check = time.time()

def _refill(self) -> None:
"""Refill tokens based on the elapsed time since the last check."""
now = time.time()
elapsed = now - self.last_check
# Add tokens proportional to elapsed time, up to the rate
self.tokens += int(elapsed * self.rate)
# Cap tokens at the maximum rate to enforce the rate limit
self.tokens = min(self.tokens, self.rate)
self.last_check = now

def release(self, tokens: int) -> None:
"""When you are unable to consume amount units of token, release them into the bucket.
E.g. say you wanted to read 1024 units, but only 24 units were read, then put
back unconsumed 1000 tokens back in the bucket."""
if tokens < 0:
raise ValueError('Cannot release a negative number of tokens')
self.tokens += tokens
self.tokens = min(self.tokens, self.rate)

def consume(self, amount: int) -> int:
"""Attempt to consume the amount from the bucket.
Returns the amount allowed to be sent, up to the available tokens (rate).
"""
self._refill()
allowed = min(amount, self.tokens)
self.tokens -= allowed
return allowed
61 changes: 61 additions & 0 deletions tests/common/test_leakage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import time

import unittest

from proxy.common.leakage import Leakage


class TestLeakage(unittest.TestCase):

def test_initial_consume_no_tokens(self) -> None:
# Test consuming with no tokens available initially
rate = 100 # bytes per second
bucket = Leakage(rate)
self.assertEqual(
bucket.consume(150),
100,
) # No tokens yet, so expect 0 bytes to be sent

def test_consume_with_refill(self) -> None:
# Test consuming with refill after waiting
rate = 100 # bytes per second
bucket = Leakage(rate)
time.sleep(1) # Wait for a second to allow refill
self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available

def test_consume_above_leak_rate(self) -> None:
# Test attempting to consume more than the leak rate after a refill
rate = 100 # bytes per second
bucket = Leakage(rate)
time.sleep(1) # Wait for a second to allow refill
self.assertEqual(bucket.consume(150), 100) # Only 100 bytes should be allowed

def test_repeated_consume_with_partial_refill(self) -> None:
# Test repeated consumption with partial refill
rate = 100 # bytes per second
bucket = Leakage(rate)

time.sleep(1) # Allow tokens to accumulate
bucket.consume(80) # Consume 80 bytes, should leave 20
time.sleep(0.5) # Wait half a second to refill by 50 bytes

self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available now

def test_negative_token_guard(self) -> None:
# Ensure tokens do not go negative
rate = 100 # bytes per second
bucket = Leakage(rate)
time.sleep(1) # Allow tokens to accumulate
bucket.consume(150) # Consume all available tokens
self.assertEqual(bucket.consume(10), 0) # Should return 0 as no tokens are left
self.assertEqual(bucket.tokens, 0) # Tokens should not be negative

0 comments on commit ad2e107

Please sign in to comment.