Skip to content

Commit

Permalink
step 2 - macro
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Nov 14, 2024
1 parent 7cea6cb commit c7e3296
Showing 1 changed file with 86 additions and 171 deletions.
257 changes: 86 additions & 171 deletions relay-server/src/utils/scheduled/queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use futures::StreamExt as _;
use priority_queue::PriorityQueue;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
Expand All @@ -15,62 +14,26 @@ use futures::Stream;

/// A scheduled queue that can be polled for when the next item is ready.
pub struct ScheduledQueue<T> {
inner: Inner<BinaryHeap<Item<T>>>,
queue: BinaryHeap<Item<T>>,
sleep: Pin<Box<tokio::time::Sleep>>,
}

impl<T> ScheduledQueue<T> {
/// Creates a new, empty [`ScheduledQueue`].
pub fn new() -> Self {
Self {
inner: Default::default(),
}
}

/// Returns the current size of the queue.
pub fn len(&self) -> usize {
self.inner.len()
}

/// Returns true if there are no items in the queue.
#[cfg_attr(not(test), expect(dead_code))]
pub fn is_empty(&self) -> bool {
self.len() == 0
}

/// Schedules a new item to be yielded at `when`.
pub fn schedule(&mut self, when: Instant, value: T) {
self.inner.push(Item { when, value });
}
}

impl<T: fmt::Debug> fmt::Debug for ScheduledQueue<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let now = Instant::now();
let mut f = f.debug_list();
for Item { when, value } in self.inner.iter() {
f.entry(&(when.saturating_duration_since(now), value));
}
f.finish()
self.queue.push(Item { when, value });
}
}

impl<T> Stream for ScheduledQueue<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
fn peek_when(&self) -> Option<Instant> {
self.queue.peek().map(|item| item.when)
}
}

impl<T> FusedStream for ScheduledQueue<T> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
fn pop_value(&mut self) -> Option<T> {
self.queue.pop().map(|item| item.value)
}
}

impl<T> Default for ScheduledQueue<T> {
fn default() -> Self {
Self::new()
fn iter(&self) -> impl Iterator<Item = (Instant, &T)> + '_ {
self.queue.iter().map(|item| (item.when, &item.value))
}
}

Expand All @@ -82,166 +45,118 @@ pub struct UniqueScheduledQueue<T>
where
T: std::hash::Hash + Eq,
{
inner: Inner<PriorityQueue<T, Reverse<Instant>>>,
queue: PriorityQueue<T, Reverse<Instant>>,
sleep: Pin<Box<tokio::time::Sleep>>,
}

impl<T: std::hash::Hash + Eq> UniqueScheduledQueue<T> {
/// Creates a new, empty [`UniqueScheduledQueue`].
pub fn new() -> Self {
Self {
inner: Default::default(),
}
}

/// Returns the current size of the queue.
pub fn len(&self) -> usize {
self.inner.len()
}

/// Returns true if there are no items in the queue.
pub fn is_empty(&self) -> bool {
self.len() == 0
}

/// Schedules an item to be yielded at `when`.
///
/// If the item was net yet scheduled, it is inserted into the queue,
/// otherwise the previous schedule is moved to the new deadline.
pub fn schedule(&mut self, when: Instant, value: T) {
self.inner.push(value, Reverse(when));
self.queue.push(value, Reverse(when));
}

/// Removes a value from the queue.
pub fn remove(&mut self, value: &T) {
self.inner.remove(value);
}
}

impl<T: fmt::Debug + std::hash::Hash + Eq> fmt::Debug for UniqueScheduledQueue<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let now = Instant::now();
let mut f = f.debug_list();
for (value, Reverse(when)) in self.inner.iter() {
f.entry(&(when.saturating_duration_since(now), value));
}
f.finish()
}
}

impl<T: std::hash::Hash + Eq> Stream for UniqueScheduledQueue<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}

impl<T: std::hash::Hash + Eq> FusedStream for UniqueScheduledQueue<T> {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
self.queue.remove(value);
}
}
//
impl<T: std::hash::Hash + Eq> Default for UniqueScheduledQueue<T> {
fn default() -> Self {
Self::new()
}
}

trait Queue {
type Value;

/// Peeks into the queue returning the deadline of the first item.
fn peek_when(&self) -> Option<Instant>;
/// Removes the first element from the queue returning its value.
///
/// A successful [`Self::peek_when`] followed by a [`Self::pop_value`] must not return `None`.
fn pop_value(&mut self) -> Option<Self::Value>;
}

impl<T> Queue for BinaryHeap<Item<T>> {
type Value = T;

fn peek_when(&self) -> Option<Instant> {
self.peek().map(|item| item.when)
self.queue.peek().map(|(_, Reverse(when))| *when)
}

fn pop_value(&mut self) -> Option<Self::Value> {
self.pop().map(|item| item.value)
fn pop_value(&mut self) -> Option<T> {
self.queue.pop().map(|(value, _)| value)
}
}

impl<T: std::hash::Hash + Eq> Queue for priority_queue::PriorityQueue<T, Reverse<Instant>> {
type Value = T;

fn peek_when(&self) -> Option<Instant> {
self.peek().map(|(_, Reverse(when))| *when)
fn iter(&self) -> impl Iterator<Item = (Instant, &T)> + '_ {
self.queue
.iter()
.map(|(value, Reverse(when))| (*when, value))
}

fn pop_value(&mut self) -> Option<Self::Value> {
self.pop().map(|(value, _)| value)
}
}

#[derive(Debug)]
struct Inner<Q> {
inner: Q,
sleep: Pin<Box<tokio::time::Sleep>>,
}

impl<Q> std::ops::Deref for Inner<Q> {
type Target = Q;

fn deref(&self) -> &Self::Target {
&self.inner
}
}
macro_rules! impl_queue {
($name:ident, $($where:tt)*) => {
impl<T: $($where)*> $name<T> {
/// Creates a new, empty [`Self`].
pub fn new() -> Self {
Self {
queue: Default::default(),
sleep: Box::pin(tokio::time::sleep(Duration::MAX)),
}
}

impl<Q> std::ops::DerefMut for Inner<Q> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
/// Returns the current size of the queue.
#[allow(dead_code)]
pub fn len(&self) -> usize {
self.queue.len()
}

impl<Q: Default> Default for Inner<Q> {
fn default() -> Self {
Self {
inner: Default::default(),
sleep: Box::pin(tokio::time::sleep(Duration::MAX)),
/// Returns true if there are no items in the queue.
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
}
}

impl<Q> Unpin for Inner<Q> {}
impl<T: $($where)*> Default for $name<T> {
fn default() -> Self {
Self::new()
}
}

impl<Q: Queue> Stream for Inner<Q> {
type Item = Q::Value;
impl<T: $($where)*> Unpin for $name<T> {}

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(when) = self.inner.peek_when() {
// The head of the queue changed, reset the deadline.
if self.sleep.deadline() != when {
self.sleep.as_mut().reset(when);
impl<T: $($where)*> FusedStream for $name<T> {
fn is_terminated(&self) -> bool {
// The stream never returns `Poll::Ready(None)`.
false
}
}

// Poll and wait for the next item to be ready.
if self.sleep.as_mut().poll(cx).is_ready() {
// Item is ready, yield it.
let value = self.inner.pop_value().expect("pop after peek");
return Poll::Ready(Some(value));
impl<T: $($where)*> Stream for $name<T> {
type Item = T;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(when) = self.peek_when() {
// The head of the queue changed, reset the deadline.
if self.sleep.deadline() != when {
self.sleep.as_mut().reset(when);
}

// Poll and wait for the next item to be ready.
if self.sleep.as_mut().poll(cx).is_ready() {
// Item is ready, yield it.
let value = self.pop_value().expect("pop after peek");
return Poll::Ready(Some(value));
}
}

Poll::Pending
}
}

Poll::Pending
}
impl<T: $($where)*> fmt::Debug for $name<T> where T: fmt::Debug {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let now = Instant::now();
let mut f = f.debug_list();
for (when, value) in self.iter() {
f.entry(&(when.saturating_duration_since(now), value));
}
f.finish()
}
}
};
}

impl<Q: Queue> FusedStream for Inner<Q> {
fn is_terminated(&self) -> bool {
// The stream never returns `Poll::Ready(None)`.
false
}
}
impl_queue!(ScheduledQueue, Sized);
impl_queue!(UniqueScheduledQueue, std::hash::Hash + Eq);

struct Item<T> {
when: Instant,
Expand Down

0 comments on commit c7e3296

Please sign in to comment.