Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leakage class #1510

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions proxy/common/leakage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import time


class Leakage:
"""Leaky Bucket algorithm."""

def __init__(self, rate: int) -> None:
"""Initialize the leaky bucket with a specified leak rate in bytes per second."""
# Maximum number of tokens the bucket can hold (bytes per second)
self.rate = rate
self.tokens = rate
self.last_check = time.time()

def _refill(self) -> None:
"""Refill tokens based on the elapsed time since the last check."""
now = time.time()
elapsed = now - self.last_check
# Add tokens proportional to elapsed time, up to the rate
self.tokens += int(elapsed * self.rate)
# Cap tokens at the maximum rate to enforce the rate limit
self.tokens = min(self.tokens, self.rate)
self.last_check = now

def release(self, tokens: int) -> None:
"""When you are unable to consume amount units of token, release them into the bucket.

E.g. say you wanted to read 1024 units, but only 24 units were read, then put
back unconsumed 1000 tokens back in the bucket."""
if tokens < 0:
raise ValueError('Cannot release a negative number of tokens')
self.tokens += tokens
self.tokens = min(self.tokens, self.rate)

Check warning on line 42 in proxy/common/leakage.py

View check run for this annotation

Codecov / codecov/patch

proxy/common/leakage.py#L40-L42

Added lines #L40 - L42 were not covered by tests

def consume(self, amount: int) -> int:
"""Attempt to consume the amount from the bucket.

Returns the amount allowed to be sent, up to the available tokens (rate).
"""
self._refill()
allowed = min(amount, self.tokens)
self.tokens -= allowed
return allowed
61 changes: 61 additions & 0 deletions tests/common/test_leakage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import time

import unittest

from proxy.common.leakage import Leakage


class TestLeakage(unittest.TestCase):

def test_initial_consume_no_tokens(self) -> None:
# Test consuming with no tokens available initially
rate = 100 # bytes per second
bucket = Leakage(rate)
self.assertEqual(
bucket.consume(150),
100,
) # No tokens yet, so expect 0 bytes to be sent

def test_consume_with_refill(self) -> None:
# Test consuming with refill after waiting
rate = 100 # bytes per second
bucket = Leakage(rate)
time.sleep(1) # Wait for a second to allow refill
self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available

def test_consume_above_leak_rate(self) -> None:
# Test attempting to consume more than the leak rate after a refill
rate = 100 # bytes per second
bucket = Leakage(rate)
time.sleep(1) # Wait for a second to allow refill
self.assertEqual(bucket.consume(150), 100) # Only 100 bytes should be allowed

def test_repeated_consume_with_partial_refill(self) -> None:
# Test repeated consumption with partial refill
rate = 100 # bytes per second
bucket = Leakage(rate)

time.sleep(1) # Allow tokens to accumulate
bucket.consume(80) # Consume 80 bytes, should leave 20
time.sleep(0.5) # Wait half a second to refill by 50 bytes

self.assertEqual(bucket.consume(50), 50) # 50 bytes should be available now

def test_negative_token_guard(self) -> None:
# Ensure tokens do not go negative
rate = 100 # bytes per second
bucket = Leakage(rate)
time.sleep(1) # Allow tokens to accumulate
bucket.consume(150) # Consume all available tokens
self.assertEqual(bucket.consume(10), 0) # Should return 0 as no tokens are left
self.assertEqual(bucket.tokens, 0) # Tokens should not be negative
Loading