diff --git a/relay-server/src/utils/scheduled/queue.rs b/relay-server/src/utils/scheduled/queue.rs index b5cd88c943..600687d50d 100644 --- a/relay-server/src/utils/scheduled/queue.rs +++ b/relay-server/src/utils/scheduled/queue.rs @@ -1,4 +1,3 @@ -use futures::StreamExt as _; use priority_queue::PriorityQueue; use std::cmp::Reverse; use std::collections::BinaryHeap; @@ -15,62 +14,26 @@ use futures::Stream; /// A scheduled queue that can be polled for when the next item is ready. pub struct ScheduledQueue { - inner: Inner>>, + queue: BinaryHeap>, + sleep: Pin>, } impl ScheduledQueue { - /// Creates a new, empty [`ScheduledQueue`]. - pub fn new() -> Self { - Self { - inner: Default::default(), - } - } - - /// Returns the current size of the queue. - pub fn len(&self) -> usize { - self.inner.len() - } - - /// Returns true if there are no items in the queue. - #[cfg_attr(not(test), expect(dead_code))] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - /// Schedules a new item to be yielded at `when`. pub fn schedule(&mut self, when: Instant, value: T) { - self.inner.push(Item { when, value }); - } -} - -impl fmt::Debug for ScheduledQueue { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let now = Instant::now(); - let mut f = f.debug_list(); - for Item { when, value } in self.inner.iter() { - f.entry(&(when.saturating_duration_since(now), value)); - } - f.finish() + self.queue.push(Item { when, value }); } -} -impl Stream for ScheduledQueue { - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_next_unpin(cx) + fn peek_when(&self) -> Option { + self.queue.peek().map(|item| item.when) } -} -impl FusedStream for ScheduledQueue { - fn is_terminated(&self) -> bool { - self.inner.is_terminated() + fn pop_value(&mut self) -> Option { + self.queue.pop().map(|item| item.value) } -} -impl Default for ScheduledQueue { - fn default() -> Self { - Self::new() + fn iter(&self) -> impl Iterator + '_ { + self.queue.iter().map(|item| (item.when, &item.value)) } } @@ -82,166 +45,118 @@ pub struct UniqueScheduledQueue where T: std::hash::Hash + Eq, { - inner: Inner>>, + queue: PriorityQueue>, + sleep: Pin>, } impl UniqueScheduledQueue { - /// Creates a new, empty [`UniqueScheduledQueue`]. - pub fn new() -> Self { - Self { - inner: Default::default(), - } - } - - /// Returns the current size of the queue. - pub fn len(&self) -> usize { - self.inner.len() - } - - /// Returns true if there are no items in the queue. - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - /// Schedules an item to be yielded at `when`. /// /// If the item was net yet scheduled, it is inserted into the queue, /// otherwise the previous schedule is moved to the new deadline. pub fn schedule(&mut self, when: Instant, value: T) { - self.inner.push(value, Reverse(when)); + self.queue.push(value, Reverse(when)); } /// Removes a value from the queue. pub fn remove(&mut self, value: &T) { - self.inner.remove(value); - } -} - -impl fmt::Debug for UniqueScheduledQueue { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let now = Instant::now(); - let mut f = f.debug_list(); - for (value, Reverse(when)) in self.inner.iter() { - f.entry(&(when.saturating_duration_since(now), value)); - } - f.finish() - } -} - -impl Stream for UniqueScheduledQueue { - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_next_unpin(cx) - } -} - -impl FusedStream for UniqueScheduledQueue { - fn is_terminated(&self) -> bool { - self.inner.is_terminated() + self.queue.remove(value); } -} -// -impl Default for UniqueScheduledQueue { - fn default() -> Self { - Self::new() - } -} - -trait Queue { - type Value; - - /// Peeks into the queue returning the deadline of the first item. - fn peek_when(&self) -> Option; - /// Removes the first element from the queue returning its value. - /// - /// A successful [`Self::peek_when`] followed by a [`Self::pop_value`] must not return `None`. - fn pop_value(&mut self) -> Option; -} - -impl Queue for BinaryHeap> { - type Value = T; fn peek_when(&self) -> Option { - self.peek().map(|item| item.when) + self.queue.peek().map(|(_, Reverse(when))| *when) } - fn pop_value(&mut self) -> Option { - self.pop().map(|item| item.value) + fn pop_value(&mut self) -> Option { + self.queue.pop().map(|(value, _)| value) } -} - -impl Queue for priority_queue::PriorityQueue> { - type Value = T; - fn peek_when(&self) -> Option { - self.peek().map(|(_, Reverse(when))| *when) + fn iter(&self) -> impl Iterator + '_ { + self.queue + .iter() + .map(|(value, Reverse(when))| (*when, value)) } - - fn pop_value(&mut self) -> Option { - self.pop().map(|(value, _)| value) - } -} - -#[derive(Debug)] -struct Inner { - inner: Q, - sleep: Pin>, } -impl std::ops::Deref for Inner { - type Target = Q; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} +macro_rules! impl_queue { + ($name:ident, $($where:tt)*) => { + impl $name { + /// Creates a new, empty [`Self`]. + pub fn new() -> Self { + Self { + queue: Default::default(), + sleep: Box::pin(tokio::time::sleep(Duration::MAX)), + } + } -impl std::ops::DerefMut for Inner { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} + /// Returns the current size of the queue. + #[allow(dead_code)] + pub fn len(&self) -> usize { + self.queue.len() + } -impl Default for Inner { - fn default() -> Self { - Self { - inner: Default::default(), - sleep: Box::pin(tokio::time::sleep(Duration::MAX)), + /// Returns true if there are no items in the queue. + #[allow(dead_code)] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } - } -} -impl Unpin for Inner {} + impl Default for $name { + fn default() -> Self { + Self::new() + } + } -impl Stream for Inner { - type Item = Q::Value; + impl Unpin for $name {} - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Some(when) = self.inner.peek_when() { - // The head of the queue changed, reset the deadline. - if self.sleep.deadline() != when { - self.sleep.as_mut().reset(when); + impl FusedStream for $name { + fn is_terminated(&self) -> bool { + // The stream never returns `Poll::Ready(None)`. + false } + } - // Poll and wait for the next item to be ready. - if self.sleep.as_mut().poll(cx).is_ready() { - // Item is ready, yield it. - let value = self.inner.pop_value().expect("pop after peek"); - return Poll::Ready(Some(value)); + impl Stream for $name { + type Item = T; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if let Some(when) = self.peek_when() { + // The head of the queue changed, reset the deadline. + if self.sleep.deadline() != when { + self.sleep.as_mut().reset(when); + } + + // Poll and wait for the next item to be ready. + if self.sleep.as_mut().poll(cx).is_ready() { + // Item is ready, yield it. + let value = self.pop_value().expect("pop after peek"); + return Poll::Ready(Some(value)); + } + } + + Poll::Pending } } - Poll::Pending - } + impl fmt::Debug for $name where T: fmt::Debug { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let now = Instant::now(); + let mut f = f.debug_list(); + for (when, value) in self.iter() { + f.entry(&(when.saturating_duration_since(now), value)); + } + f.finish() + } + } + }; } -impl FusedStream for Inner { - fn is_terminated(&self) -> bool { - // The stream never returns `Poll::Ready(None)`. - false - } -} +impl_queue!(ScheduledQueue, Sized); +impl_queue!(UniqueScheduledQueue, std::hash::Hash + Eq); struct Item { when: Instant,