From e013f33acf06b1f6af0870c4fc9f323b8b8994e6 Mon Sep 17 00:00:00 2001 From: Greaka Date: Sun, 19 Feb 2023 19:18:30 +0100 Subject: [PATCH 1/5] feat(http-ratelimiting): add a bucket for global rate limits --- .../src/in_memory/bucket.rs | 66 ++++++------ .../src/in_memory/global_bucket.rs | 100 ++++++++++++++++++ .../src/in_memory/mod.rs | 43 ++------ 3 files changed, 140 insertions(+), 69 deletions(-) create mode 100644 twilight-http-ratelimiting/src/in_memory/global_bucket.rs diff --git a/twilight-http-ratelimiting/src/in_memory/bucket.rs b/twilight-http-ratelimiting/src/in_memory/bucket.rs index fbc33894b29..691ef3993ad 100644 --- a/twilight-http-ratelimiting/src/in_memory/bucket.rs +++ b/twilight-http-ratelimiting/src/in_memory/bucket.rs @@ -3,7 +3,8 @@ //! consumed by the [`BucketQueueTask`] that manages the ratelimit for the bucket //! and respects the global ratelimit. -use super::GlobalLockPair; +use super::GlobalBucket; +use crate::ticket::TicketSender; use crate::{headers::RatelimitHeaders, request::Path, ticket::TicketNotifier}; use std::{ collections::HashMap, @@ -13,6 +14,7 @@ use std::{ }, time::{Duration, Instant}, }; +use tokio::sync::oneshot::error::RecvError; use tokio::{ sync::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, @@ -55,11 +57,11 @@ impl Bucket { /// Create a new bucket for the specified [`Path`]. pub fn new(path: Path) -> Self { Self { - limit: AtomicU64::new(u64::max_value()), + limit: AtomicU64::new(u64::MAX), path, queue: BucketQueue::default(), - remaining: AtomicU64::new(u64::max_value()), - reset_after: AtomicU64::new(u64::max_value()), + remaining: AtomicU64::new(u64::MAX), + reset_after: AtomicU64::new(u64::MAX), started_at: Mutex::new(None), } } @@ -134,7 +136,7 @@ impl Bucket { } if let Some((limit, remaining, reset_after)) = ratelimits { - if bucket_limit != limit && bucket_limit == u64::max_value() { + if bucket_limit != limit && bucket_limit == u64::MAX { self.reset_after.store(reset_after, Ordering::SeqCst); self.limit.store(limit, Ordering::SeqCst); } @@ -162,10 +164,10 @@ impl BucketQueue { } /// Receive the first incoming ratelimit request. - pub async fn pop(&self, timeout_duration: Duration) -> Option { + pub async fn pop(&self) -> Option { let mut rx = self.rx.lock().await; - timeout(timeout_duration, rx.recv()).await.ok().flatten() + rx.recv().await } } @@ -189,7 +191,7 @@ pub(super) struct BucketQueueTask { /// All buckets managed by the associated [`super::InMemoryRatelimiter`]. buckets: Arc>>>, /// Global ratelimit data. - global: Arc, + global: GlobalBucket, /// The [`Path`] this [`Bucket`] belongs to. path: Path, } @@ -202,7 +204,7 @@ impl BucketQueueTask { pub fn new( bucket: Arc, buckets: Arc>>>, - global: Arc, + global: GlobalBucket, path: Path, ) -> Self { Self { @@ -218,9 +220,11 @@ impl BucketQueueTask { #[tracing::instrument(name = "background queue task", skip(self), fields(path = ?self.path))] pub async fn run(self) { while let Some(queue_tx) = self.next().await { - if self.global.is_locked() { - drop(self.global.0.lock().await); - } + let global_ticket_tx = if let Ok(ticket_tx) = self.wait_for_global().await { + ticket_tx + } else { + continue; + }; let ticket_headers = if let Some(ticket_headers) = queue_tx.available() { ticket_headers @@ -231,7 +235,10 @@ impl BucketQueueTask { tracing::debug!("starting to wait for response headers"); match timeout(Self::WAIT, ticket_headers).await { - Ok(Ok(Some(headers))) => self.handle_headers(&headers).await, + Ok(Ok(Some(headers))) => { + self.handle_headers(&headers).await; + global_ticket_tx.headers(Some(headers)).ok(); + } Ok(Ok(None)) => { tracing::debug!("request aborted"); } @@ -252,43 +259,38 @@ impl BucketQueueTask { .remove(&self.path); } + #[tracing::instrument(name = "waiting for global bucket", skip_all)] + async fn wait_for_global(&self) -> Result { + let (tx, rx) = super::ticket::channel(); + self.global.queue().push(tx); + + tracing::debug!("waiting for global rate limit"); + let res = rx.await; + tracing::debug!("done waiting for global rate limit"); + + res + } + /// Update the bucket's ratelimit state. async fn handle_headers(&self, headers: &RatelimitHeaders) { let ratelimits = match headers { - RatelimitHeaders::Global(global) => { - self.lock_global(Duration::from_secs(global.retry_after())) - .await; - - None - } - RatelimitHeaders::None => return, RatelimitHeaders::Present(present) => { Some((present.limit(), present.remaining(), present.reset_after())) } + _ => return, }; tracing::debug!(path=?self.path, "updating bucket"); self.bucket.update(ratelimits); } - /// Lock the global ratelimit for a specified duration. - async fn lock_global(&self, wait: Duration) { - tracing::debug!(path=?self.path, "request got global ratelimited"); - self.global.lock(); - let lock = self.global.0.lock().await; - sleep(wait).await; - self.global.unlock(); - - drop(lock); - } - /// Get the next [`TicketNotifier`] in the queue. async fn next(&self) -> Option { tracing::debug!(path=?self.path, "starting to get next in queue"); self.wait_if_needed().await; - self.bucket.queue.pop(Self::WAIT).await + self.bucket.queue.pop().await } /// Wait for this bucket to refresh if it isn't ready yet. diff --git a/twilight-http-ratelimiting/src/in_memory/global_bucket.rs b/twilight-http-ratelimiting/src/in_memory/global_bucket.rs new file mode 100644 index 00000000000..ccda77ca68f --- /dev/null +++ b/twilight-http-ratelimiting/src/in_memory/global_bucket.rs @@ -0,0 +1,100 @@ +use super::bucket::BucketQueue; +use crate::RatelimitHeaders; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::Instant; + +/// seconds per period +const PERIOD: u64 = 1; +/// requests per period +const REQUESTS: u64 = 50; + +/// Global bucket. Keeps track of the global rate limit. +#[derive(Debug, Clone)] +pub struct GlobalBucket(Arc); + +impl GlobalBucket { + /// Queue of global ratelimit requests + pub fn queue(&self) -> &BucketQueue { + &self.0.queue + } + + /// Whether the global ratelimit is exhausted. + pub fn is_locked(&self) -> bool { + self.0.is_locked.load(Ordering::Relaxed) + } +} + +impl Default for GlobalBucket { + fn default() -> Self { + Self(InnerGlobalBucket::new()) + } +} + +#[derive(Debug)] +struct InnerGlobalBucket { + pub queue: BucketQueue, + /// currently waiting for capacity + is_locked: AtomicBool, +} + +impl InnerGlobalBucket { + fn new() -> Arc { + let this = Self { + queue: Default::default(), + is_locked: Default::default(), + }; + let this = Arc::new(this); + + tokio::spawn(run_global_queue_task(this.clone())); + + this + } +} + +#[tracing::instrument(name = "background global queue task", skip_all)] +async fn run_global_queue_task(bucket: Arc) { + let mut time = Instant::now(); + + while let Some(queue_tx) = bucket.queue.pop().await { + wait_if_needed(bucket.as_ref(), &mut time).await; + + let ticket_headers = if let Some(ticket_headers) = queue_tx.available() { + ticket_headers + } else { + continue; + }; + + if let Ok(Some(RatelimitHeaders::Global(headers))) = ticket_headers.await { + tracing::debug!(seconds = headers.retry_after(), "globally ratelimited"); + + bucket.is_locked.store(true, Ordering::Release); + tokio::time::sleep(Duration::from_secs(headers.retry_after())).await; + bucket.is_locked.store(false, Ordering::Release); + } + } +} + +async fn wait_if_needed(bucket: &InnerGlobalBucket, time: &mut Instant) { + let period = Duration::from_secs(PERIOD); + let fill_rate = period / REQUESTS as u32; + + let now = Instant::now(); + // base contingent of 1 period worth of requests + let base = now - period; + // reset to base if no request came in for long enough + if base > *time { + *time = base; + } + + // we request one request worth of rate limit consumption + *time += fill_rate; + + // if time > now, wait until there is capacity available again + if *time > now { + bucket.is_locked.store(true, Ordering::Release); + tokio::time::sleep_until(*time).await; + bucket.is_locked.store(false, Ordering::Release); + } +} diff --git a/twilight-http-ratelimiting/src/in_memory/mod.rs b/twilight-http-ratelimiting/src/in_memory/mod.rs index 2479933a6ed..c1b2d1e893f 100644 --- a/twilight-http-ratelimiting/src/in_memory/mod.rs +++ b/twilight-http-ratelimiting/src/in_memory/mod.rs @@ -1,8 +1,10 @@ //! In-memory based default [`Ratelimiter`] implementation used in `twilight-http`. mod bucket; +mod global_bucket; use self::bucket::{Bucket, BucketQueueTask}; +pub use self::global_bucket::GlobalBucket; use super::{ ticket::{self, TicketNotifier}, Bucket as InfoBucket, Ratelimiter, @@ -13,37 +15,9 @@ use crate::{ use futures_util::future; use std::{ collections::hash_map::{Entry, HashMap}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, - }, + sync::{Arc, Mutex}, time::Duration, }; -use tokio::sync::Mutex as AsyncMutex; - -/// Global lock. We use a pair to avoid actually locking the mutex every check. -/// This allows futures to only wait on the global lock when a global ratelimit -/// is in place by, in turn, waiting for a guard, and then each immediately -/// dropping it. -#[derive(Debug, Default)] -struct GlobalLockPair(AsyncMutex<()>, AtomicBool); - -impl GlobalLockPair { - /// Set the global ratelimit as exhausted. - pub fn lock(&self) { - self.1.store(true, Ordering::Release); - } - - /// Set the global ratelimit as no longer exhausted. - pub fn unlock(&self) { - self.1.store(false, Ordering::Release); - } - - /// Whether the global ratelimit is exhausted. - pub fn is_locked(&self) -> bool { - self.1.load(Ordering::Relaxed) - } -} /// Default ratelimiter implementation used in twilight that /// stores ratelimit information in an in-memory mapping. @@ -59,7 +33,7 @@ pub struct InMemoryRatelimiter { /// Mapping of [`Path`]s to their associated [`Bucket`]s. buckets: Arc>>>, /// Global ratelimit data. - global: Arc, + global: GlobalBucket, } impl InMemoryRatelimiter { @@ -146,13 +120,8 @@ impl Ratelimiter for InMemoryRatelimiter { if let Some(bucket) = self.entry(path.clone(), tx) { tokio::spawn( - BucketQueueTask::new( - bucket, - Arc::clone(&self.buckets), - Arc::clone(&self.global), - path, - ) - .run(), + BucketQueueTask::new(bucket, Arc::clone(&self.buckets), self.global.clone(), path) + .run(), ); } From 0f33f8530a5056b2c8774faeab0a32f6fe838da2 Mon Sep 17 00:00:00 2001 From: Greaka Date: Sun, 19 Feb 2023 20:02:04 +0100 Subject: [PATCH 2/5] fix(http-ratelimiting): address clippy issues --- .../src/in_memory/bucket.rs | 4 ++-- .../src/in_memory/global_bucket.rs | 18 ++++++++++++------ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/twilight-http-ratelimiting/src/in_memory/bucket.rs b/twilight-http-ratelimiting/src/in_memory/bucket.rs index 691ef3993ad..38cd4848e29 100644 --- a/twilight-http-ratelimiting/src/in_memory/bucket.rs +++ b/twilight-http-ratelimiting/src/in_memory/bucket.rs @@ -236,7 +236,7 @@ impl BucketQueueTask { match timeout(Self::WAIT, ticket_headers).await { Ok(Ok(Some(headers))) => { - self.handle_headers(&headers).await; + self.handle_headers(&headers); global_ticket_tx.headers(Some(headers)).ok(); } Ok(Ok(None)) => { @@ -272,7 +272,7 @@ impl BucketQueueTask { } /// Update the bucket's ratelimit state. - async fn handle_headers(&self, headers: &RatelimitHeaders) { + fn handle_headers(&self, headers: &RatelimitHeaders) { let ratelimits = match headers { RatelimitHeaders::Present(present) => { Some((present.limit(), present.remaining(), present.reset_after())) diff --git a/twilight-http-ratelimiting/src/in_memory/global_bucket.rs b/twilight-http-ratelimiting/src/in_memory/global_bucket.rs index ccda77ca68f..b9bb63217d8 100644 --- a/twilight-http-ratelimiting/src/in_memory/global_bucket.rs +++ b/twilight-http-ratelimiting/src/in_memory/global_bucket.rs @@ -1,3 +1,5 @@ +//! Bucket implementation for a global ratelimit. + use super::bucket::BucketQueue; use crate::RatelimitHeaders; use std::sync::atomic::{AtomicBool, Ordering}; @@ -8,14 +10,14 @@ use tokio::time::Instant; /// seconds per period const PERIOD: u64 = 1; /// requests per period -const REQUESTS: u64 = 50; +const REQUESTS: u32 = 50; /// Global bucket. Keeps track of the global rate limit. #[derive(Debug, Clone)] pub struct GlobalBucket(Arc); impl GlobalBucket { - /// Queue of global ratelimit requests + /// Queue of global ratelimit requests. pub fn queue(&self) -> &BucketQueue { &self.0.queue } @@ -32,18 +34,21 @@ impl Default for GlobalBucket { } } +/// Inner struct to allow [`GlobalBucket`] to return an [`Arc`]. #[derive(Debug)] struct InnerGlobalBucket { + /// Queue to receive rate limit requests. pub queue: BucketQueue, - /// currently waiting for capacity + /// currently waiting for capacity. is_locked: AtomicBool, } impl InnerGlobalBucket { + /// Creates a new bucket and starts a task processing incoming requests. fn new() -> Arc { let this = Self { - queue: Default::default(), - is_locked: Default::default(), + queue: BucketQueue::default(), + is_locked: AtomicBool::default(), }; let this = Arc::new(this); @@ -76,9 +81,10 @@ async fn run_global_queue_task(bucket: Arc) { } } +/// Checks and sleeps in case a request needs to wait before proceeding. async fn wait_if_needed(bucket: &InnerGlobalBucket, time: &mut Instant) { let period = Duration::from_secs(PERIOD); - let fill_rate = period / REQUESTS as u32; + let fill_rate = period / REQUESTS; let now = Instant::now(); // base contingent of 1 period worth of requests From ff9a4e755ce740500d27690d4446a1523a403de4 Mon Sep 17 00:00:00 2001 From: Greaka Date: Sun, 19 Feb 2023 20:16:49 +0100 Subject: [PATCH 3/5] feat(http-ratelimiting): add the ability to specify custom global rate limits --- .../src/in_memory/global_bucket.rs | 31 ++++++++++++++----- .../src/in_memory/mod.rs | 13 ++++++++ 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/twilight-http-ratelimiting/src/in_memory/global_bucket.rs b/twilight-http-ratelimiting/src/in_memory/global_bucket.rs index b9bb63217d8..b5ce6ca0a46 100644 --- a/twilight-http-ratelimiting/src/in_memory/global_bucket.rs +++ b/twilight-http-ratelimiting/src/in_memory/global_bucket.rs @@ -17,6 +17,16 @@ const REQUESTS: u32 = 50; pub struct GlobalBucket(Arc); impl GlobalBucket { + /// Creates a new global bucket using custom ratelimit values. + /// + /// `period` is given in seconds. + /// + /// `requests` indicates the amount of requests per period. + #[must_use] + pub fn with_ratelimit(period: u64, requests: u32) -> Self { + Self(InnerGlobalBucket::new(period, requests)) + } + /// Queue of global ratelimit requests. pub fn queue(&self) -> &BucketQueue { &self.0.queue @@ -30,7 +40,7 @@ impl GlobalBucket { impl Default for GlobalBucket { fn default() -> Self { - Self(InnerGlobalBucket::new()) + Self(InnerGlobalBucket::new(PERIOD, REQUESTS)) } } @@ -45,25 +55,25 @@ struct InnerGlobalBucket { impl InnerGlobalBucket { /// Creates a new bucket and starts a task processing incoming requests. - fn new() -> Arc { + fn new(period: u64, requests: u32) -> Arc { let this = Self { queue: BucketQueue::default(), is_locked: AtomicBool::default(), }; let this = Arc::new(this); - tokio::spawn(run_global_queue_task(this.clone())); + tokio::spawn(run_global_queue_task(this.clone(), period, requests)); this } } #[tracing::instrument(name = "background global queue task", skip_all)] -async fn run_global_queue_task(bucket: Arc) { +async fn run_global_queue_task(bucket: Arc, period: u64, requests: u32) { let mut time = Instant::now(); while let Some(queue_tx) = bucket.queue.pop().await { - wait_if_needed(bucket.as_ref(), &mut time).await; + wait_if_needed(bucket.as_ref(), &mut time, period, requests).await; let ticket_headers = if let Some(ticket_headers) = queue_tx.available() { ticket_headers @@ -82,9 +92,14 @@ async fn run_global_queue_task(bucket: Arc) { } /// Checks and sleeps in case a request needs to wait before proceeding. -async fn wait_if_needed(bucket: &InnerGlobalBucket, time: &mut Instant) { - let period = Duration::from_secs(PERIOD); - let fill_rate = period / REQUESTS; +async fn wait_if_needed( + bucket: &InnerGlobalBucket, + time: &mut Instant, + period: u64, + requests: u32, +) { + let period = Duration::from_secs(period); + let fill_rate = period / requests; let now = Instant::now(); // base contingent of 1 period worth of requests diff --git a/twilight-http-ratelimiting/src/in_memory/mod.rs b/twilight-http-ratelimiting/src/in_memory/mod.rs index c1b2d1e893f..50b25ec9e70 100644 --- a/twilight-http-ratelimiting/src/in_memory/mod.rs +++ b/twilight-http-ratelimiting/src/in_memory/mod.rs @@ -46,6 +46,19 @@ impl InMemoryRatelimiter { Self::default() } + /// Create a new in-memory ratelimiter using custom ratelimit values. + /// + /// `period` is given in seconds. + /// + /// `requests` indicates the amount of requests per period. + #[must_use] + pub fn with_global_ratelimit(period: u64, requests: u32) -> Self { + Self { + global: GlobalBucket::with_ratelimit(period, requests), + ..Self::default() + } + } + /// Enqueue the [`TicketNotifier`] to the [`Path`]'s [`Bucket`]. /// /// Returns the new [`Bucket`] if none existed. From 9d2400f675ee42ebb9aeb48390018abc9cbd055e Mon Sep 17 00:00:00 2001 From: Greaka Date: Sat, 25 Feb 2023 20:00:57 +0100 Subject: [PATCH 4/5] feat(http-ratelimiting): allow multiple requests at once --- .../src/in_memory/bucket.rs | 9 ++-- .../src/in_memory/global_bucket.rs | 46 ++++++++++++------- .../src/in_memory/mod.rs | 2 +- 3 files changed, 33 insertions(+), 24 deletions(-) diff --git a/twilight-http-ratelimiting/src/in_memory/bucket.rs b/twilight-http-ratelimiting/src/in_memory/bucket.rs index 38cd4848e29..a87c5b84bcc 100644 --- a/twilight-http-ratelimiting/src/in_memory/bucket.rs +++ b/twilight-http-ratelimiting/src/in_memory/bucket.rs @@ -220,11 +220,8 @@ impl BucketQueueTask { #[tracing::instrument(name = "background queue task", skip(self), fields(path = ?self.path))] pub async fn run(self) { while let Some(queue_tx) = self.next().await { - let global_ticket_tx = if let Ok(ticket_tx) = self.wait_for_global().await { - ticket_tx - } else { - continue; - }; + // Do not lock up if the global rate limiter crashes for any reason + let global_ticket_tx = self.wait_for_global().await.ok(); let ticket_headers = if let Some(ticket_headers) = queue_tx.available() { ticket_headers @@ -237,7 +234,7 @@ impl BucketQueueTask { match timeout(Self::WAIT, ticket_headers).await { Ok(Ok(Some(headers))) => { self.handle_headers(&headers); - global_ticket_tx.headers(Some(headers)).ok(); + global_ticket_tx.and_then(|tx| tx.headers(Some(headers)).ok()); } Ok(Ok(None)) => { tracing::debug!("request aborted"); diff --git a/twilight-http-ratelimiting/src/in_memory/global_bucket.rs b/twilight-http-ratelimiting/src/in_memory/global_bucket.rs index b5ce6ca0a46..b8ce7d41b61 100644 --- a/twilight-http-ratelimiting/src/in_memory/global_bucket.rs +++ b/twilight-http-ratelimiting/src/in_memory/global_bucket.rs @@ -1,10 +1,11 @@ //! Bucket implementation for a global ratelimit. use super::bucket::BucketQueue; +use crate::ticket::TicketNotifier; use crate::RatelimitHeaders; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; +use tokio::sync::{Mutex, Semaphore}; use tokio::time::Instant; /// seconds per period @@ -34,7 +35,7 @@ impl GlobalBucket { /// Whether the global ratelimit is exhausted. pub fn is_locked(&self) -> bool { - self.0.is_locked.load(Ordering::Relaxed) + self.0.is_locked.try_lock().is_err() } } @@ -50,7 +51,7 @@ struct InnerGlobalBucket { /// Queue to receive rate limit requests. pub queue: BucketQueue, /// currently waiting for capacity. - is_locked: AtomicBool, + is_locked: Mutex<()>, } impl InnerGlobalBucket { @@ -58,7 +59,7 @@ impl InnerGlobalBucket { fn new(period: u64, requests: u32) -> Arc { let this = Self { queue: BucketQueue::default(), - is_locked: AtomicBool::default(), + is_locked: Mutex::default(), }; let this = Arc::new(this); @@ -71,23 +72,35 @@ impl InnerGlobalBucket { #[tracing::instrument(name = "background global queue task", skip_all)] async fn run_global_queue_task(bucket: Arc, period: u64, requests: u32) { let mut time = Instant::now(); + let semaphore = Arc::new(Semaphore::new(requests as usize)); while let Some(queue_tx) = bucket.queue.pop().await { wait_if_needed(bucket.as_ref(), &mut time, period, requests).await; - let ticket_headers = if let Some(ticket_headers) = queue_tx.available() { - ticket_headers - } else { - continue; - }; + tokio::spawn(process_request(bucket.clone(), semaphore.clone(), queue_tx)); + } +} + +#[tracing::instrument(name = "process request", skip_all)] +async fn process_request( + bucket: Arc, + semaphore: Arc, + queue_tx: TicketNotifier, +) { + // This error should never occur, but if it does, do not lock up + let _permit = semaphore.acquire().await; + + let ticket_headers = if let Some(ticket_headers) = queue_tx.available() { + ticket_headers + } else { + return; + }; - if let Ok(Some(RatelimitHeaders::Global(headers))) = ticket_headers.await { - tracing::debug!(seconds = headers.retry_after(), "globally ratelimited"); + if let Ok(Some(RatelimitHeaders::Global(headers))) = ticket_headers.await { + tracing::debug!(seconds = headers.retry_after(), "globally ratelimited"); - bucket.is_locked.store(true, Ordering::Release); - tokio::time::sleep(Duration::from_secs(headers.retry_after())).await; - bucket.is_locked.store(false, Ordering::Release); - } + let _guard = bucket.is_locked.lock().await; + tokio::time::sleep(Duration::from_secs(headers.retry_after())).await; } } @@ -114,8 +127,7 @@ async fn wait_if_needed( // if time > now, wait until there is capacity available again if *time > now { - bucket.is_locked.store(true, Ordering::Release); + let _guard = bucket.is_locked.lock().await; tokio::time::sleep_until(*time).await; - bucket.is_locked.store(false, Ordering::Release); } } diff --git a/twilight-http-ratelimiting/src/in_memory/mod.rs b/twilight-http-ratelimiting/src/in_memory/mod.rs index 50b25ec9e70..a1d7848ce46 100644 --- a/twilight-http-ratelimiting/src/in_memory/mod.rs +++ b/twilight-http-ratelimiting/src/in_memory/mod.rs @@ -4,7 +4,7 @@ mod bucket; mod global_bucket; use self::bucket::{Bucket, BucketQueueTask}; -pub use self::global_bucket::GlobalBucket; +pub(crate) use self::global_bucket::GlobalBucket; use super::{ ticket::{self, TicketNotifier}, Bucket as InfoBucket, Ratelimiter, From 0ec96cdf7c10712450ec5b81095d1388b6b8e28b Mon Sep 17 00:00:00 2001 From: Greaka Date: Sun, 26 Feb 2023 16:38:43 +0100 Subject: [PATCH 5/5] fix(http-ratelimiting): global rate limit penalties are now applied correctly --- .../src/in_memory/global_bucket.rs | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/twilight-http-ratelimiting/src/in_memory/global_bucket.rs b/twilight-http-ratelimiting/src/in_memory/global_bucket.rs index b8ce7d41b61..cdd02ff8c7b 100644 --- a/twilight-http-ratelimiting/src/in_memory/global_bucket.rs +++ b/twilight-http-ratelimiting/src/in_memory/global_bucket.rs @@ -5,7 +5,8 @@ use crate::ticket::TicketNotifier; use crate::RatelimitHeaders; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{Mutex, Semaphore}; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::{mpsc, Mutex, Semaphore}; use tokio::time::Instant; /// seconds per period @@ -73,19 +74,31 @@ impl InnerGlobalBucket { async fn run_global_queue_task(bucket: Arc, period: u64, requests: u32) { let mut time = Instant::now(); let semaphore = Arc::new(Semaphore::new(requests as usize)); + let (penalty_tx, mut penalty_rx) = mpsc::channel(requests as usize); while let Some(queue_tx) = bucket.queue.pop().await { - wait_if_needed(bucket.as_ref(), &mut time, period, requests).await; - - tokio::spawn(process_request(bucket.clone(), semaphore.clone(), queue_tx)); + wait_if_needed( + bucket.as_ref(), + &mut time, + period, + requests, + &mut penalty_rx, + ) + .await; + + tokio::spawn(process_request( + semaphore.clone(), + queue_tx, + penalty_tx.clone(), + )); } } #[tracing::instrument(name = "process request", skip_all)] async fn process_request( - bucket: Arc, semaphore: Arc, queue_tx: TicketNotifier, + penalties: Sender, ) { // This error should never occur, but if it does, do not lock up let _permit = semaphore.acquire().await; @@ -99,8 +112,8 @@ async fn process_request( if let Ok(Some(RatelimitHeaders::Global(headers))) = ticket_headers.await { tracing::debug!(seconds = headers.retry_after(), "globally ratelimited"); - let _guard = bucket.is_locked.lock().await; - tokio::time::sleep(Duration::from_secs(headers.retry_after())).await; + let deadline = Instant::now() + Duration::from_secs(headers.retry_after()); + penalties.send(deadline).await.ok(); } } @@ -110,24 +123,31 @@ async fn wait_if_needed( time: &mut Instant, period: u64, requests: u32, + penalties: &mut Receiver, ) { let period = Duration::from_secs(period); let fill_rate = period / requests; let now = Instant::now(); - // base contingent of 1 period worth of requests + // maximum requests at once is 1 period worth of requests let base = now - period; - // reset to base if no request came in for long enough + // if the bucket currently holds more requests than maximum, set to maximum if base > *time { *time = base; } - // we request one request worth of rate limit consumption + // deduct one request from current capacity *time += fill_rate; - // if time > now, wait until there is capacity available again + // if time > now, then the bucket is exhausted. wait until a request is available again if *time > now { let _guard = bucket.is_locked.lock().await; tokio::time::sleep_until(*time).await; } + + // wait for penalties + while let Ok(deadline) = penalties.try_recv() { + let _guard = bucket.is_locked.lock().await; + tokio::time::sleep_until(deadline).await; + } }