Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Timeout support #29

Open
tatetian opened this issue Jul 20, 2021 · 4 comments
Open

[RFC] Timeout support #29

tatetian opened this issue Jul 20, 2021 · 4 comments

Comments

@tatetian
Copy link
Contributor

Motivation

It is common to have event waiting logics bounded with timeouts. But currently, all NGO code will wait forever if no interesting events arrive. And since there are so many system calls that may wait with timeout, we need to figure out a general mechanism to support timeout.

Background

Currently, all event wait/wakeup code in NGO eventually relies on Waiter/WaiterQueue provided by the async-rt crate. And a convenient macro named waiter_loop is provided to make it even easier for the most common pattern when using Waiter/WaiterQueue, which is to 1) try to make progress with a task, 2) if not ready, wait for notifications before trying again.

Here are some examples.

Exit/Wait4

The exit and wait4 system calls use Waiter and WaiterQueue.

// File: libos/process/do_wait4.rs

pub async fn do_wait4(child_filter: &ProcessFilter) -> Result<(pid_t, i32)> {
    let thread = current!();
    let process = thread.process();

    waiter_loop!(process.exit_waiters(), {
        // Lock order: always lock parent then child to avoid deadlock
        let mut process_inner = process.inner();

        let unwaited_children = /* get all possible children */

        if unwaited_children.len() == 0 {
            return_errno!(ECHILD, "Cannot find any unwaited children");
        }

        // Return immediately if a child that we wait for has already exited
        let zombie_child = unwaited_children
            .iter()
            .find(|child| child.status() == ProcessStatus::Zombie);
        if let Some(zombie_child) = zombie_child {
            let zombie_pid = zombie_child.pid();
            let exit_status = free_zombie_child(&process, process_inner, zombie_pid);
            return Ok((zombie_pid, exit_status));
        }	
	}
}

// File: libos/process/do_exit.rs

fn exit_process(thread: &ThreadRef, term_status: TermStatus) {
	// ...
	
    // Notify the parent that this child process's status has changed
    parent.exit_waiters().wake_all();
	
	// ...
}

Sigtimedwait

The sigtimedwait also uses Waiter and WaiterQueue to wait for signals.

pub async fn do_sigtimedwait(interest: SigSet, timeout: Option<&Duration>) -> Result<siginfo_t> {
    if let Some(timeout) = timeout {
        warn!("do not support timeout yet");
    }

    let thread = current!();
    let process = thread.process().clone();

    // Interesting, blocked signals
    let interest = {
        let blocked = thread.sig_mask().read().unwrap();
        *blocked & interest
    };

    // Loop until we find a pending signal or reach timeout
    waiter_loop!(process.sig_waiters(), {
        if let Some(signal) = dequeue_pending_signal(&interest, &thread, &process) {
            let siginfo = signal.to_info();
            return Ok(siginfo);
        }
    });
}

Futex

Futexes are also backed by Waiter and WaiterQueue.

pub async fn futex_wait_bitset(
    futex_addr: *const i32,
    futex_val: i32,
    timeout: &Option<Duration>,
    bitset: u32,
) -> Result<()> {
    let waiter = Waiter::new();

    let futex_item = FutexItem::new(futex_key, bitset, waiter.waker());
    futex_bucket.enqueue_item(futex_item.clone());
    // Must make sure that no locks are holded by this thread before wait
    drop(futex_bucket);

    waiter.wait(/*timeout.as_ref()*/).await;
	
    Ok(())
}

I/O notifications

I/O notifications, due to its importance and ubiquitousness, are implemented with three new abstractions: Poller, Pollee, and Events.

Here is a simplified code from StreamSocket.

impl<A: Addr + 'static, R: Runtime> ConnectedStream<A, R> {
    pub async fn readv(self: &Arc<Self>, bufs: &mut [&mut [u8]]) -> Result<usize> {
        let total_len: usize = bufs.iter().map(|buf| buf.len()).sum();
        if total_len == 0 {
            return Ok(0);
        }

        // Initialize the poller only when needed
        let mut poller = None;
        loop {
            // Attempt to reade
            let res = self.try_readv(bufs);
            if !res.has_errno(EAGAIN) {
                return res;
            }

            // Wait for interesting events by polling
            if poller.is_none() {
                poller = Some(Poller::new());
            }
            let mask = Events::IN;
            let events = self.common.pollee().poll_by(mask, poller.as_mut());
            if events.is_empty() {
                poller.as_ref().unwrap().wait().await;
            }
        }
    }
}

But behind the scense, Poller and Pollee are also implemented with Waiter and WaiterQueue.

Proposed solution

The new interface

From the examples above, we can see that to support timeouts, all we have to do is to add the support of timeout to the Waiter::wait and Poller::wait methods. And since the latter one is based on Waiter, the problem is now boiled down to support timeout for the Waiter::wait method.

/// File: crates/async-rt/wait/Waiter.rs

impl Waiter {
	/// The new version of wait method, with a timeout argument added.
	///
	/// Awaiting the returned future will block until the waiter is woken up or
	/// the timeout expires (if the timeout is `Some(_)`).
	///
	/// The output type of the Future is `Result<()>`. If the waiter is woken up,
	/// then the result is `Ok`. Otherwise, the result is `Err(ETIMEOUT)`.
	///
	/// If the timeout is `Some(_)`, then the contained `Duration` will be updated
	/// to reflect the remaining time when the method returns.
	pub fn wait(&self, timeout: Option<&mut Duration>) -> WaitFuture<'__> {
		self.wait(timeout)
	}
}

impl WaiterInner {
    pub fn wait(&self, timeout: Option<&mut Duration>) -> WaitFuture<'_> {
        WaitFuture::new(self, timeout)
    }
}

pub struct WaitFuture<'a> {
    waiter: &'a WaiterInner,
	timeout: Option<&'a mut Duration>,
}

impl<'a> Future for WaitFuture<'a> {
    type Output = Result<()>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
		todo!("let's discuss the implementation detail later")
	}
}

Since the waiter_loop macro does not take an argument of timeout, we can either extend the macro with an extra argument or create a new macro named waiter_loop_with_timeout (any better name?). Either way, the timeout-supporting version macro should return a Result<()> for the loop, thus the user can capture and handle errors due to timeout.

The refined interface

It is quite common to invoke Waiter::wait in a loop. But due to Rust's ownership and move semantics, the following code is not valid.

fn imagined_blocking_syscall(timeout: Option<&mut Duration>) {
    let waiter = Waiter::new();
    loop {
        waiter.wait(timeout);
    }
}

An error of "use of moved value" will be reported by the Rust compiler.

To workaround this problem, one solution is to redefine the signature of the wait method.

impl Waiter {
	pub fn wait<T>(&self, timeout: Option<&mut T>) -> WaitFuture<'__> 
	    where T: BorrowMut<Duration>
	{
		self.wait(timeout)
	}
}

This way, the wait method can accept both Option<&mut Duration> and Option<&mut &mut Duration> as its argument. Thus, we can rewrite the loop as the following.

fn imagined_blocking_syscall(timeout: Option<&mut Duration>) {
    let waiter = Waiter::new();
    loop {
        waiter.wait(timeout.as_mut());
    }
}

Changes to the wait method

Now let's try to figure out the implementationn of the wait method. Assume that we have a decorator future Timeout that can wrap any future to make a new future that completes until the internal future completes or the timeout expires.

With this new convenient primitive of Timeout, we can now rewrite the Waiter::wait method as the following.

impl Waiter {
	pub fn wait<T>(&self, timeout: Option<&mut T>) -> Timeout<WaitFuture<'_>> 
	    where T: BorrowMut<Duration>
	{
		Timeout::new(self.wait(), timeout)
	}
}

Make WaitFuture cancel-safe

Cancelling Rust futures is still an open problem. Simply dropping a future before its completion may even introduce memory safety issues. Currently, there is no good-enough, language-level solution. See this article for more info.

Luckily, in this proposal, we only need to cancel a very specific future---WaitFuture. The original version of WaitFuture is written on the assumption that the future always run to completion.

impl<'a> Future for WaitFuture<'a> {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut raw_waker = self.waiter.raw_waker.lock();
        match self.waiter.state() {
            WaiterState::Idle => {
                self.waiter.set_state(WaiterState::Waiting);

                *raw_waker = Some(cx.waker().clone());
                Poll::Pending
            }
            WaiterState::Waiting => {
                *raw_waker = Some(cx.waker().clone());
                Poll::Pending
            }
            WaiterState::Woken => {
                debug_assert!(raw_waker.is_none());
                Poll::Ready(())
            }
        }
    }
}

Simply droping a WaitFuture may result in a Waiter / WaiterInner in an unexpected state. To fixe this issue, we can implement a Drop trait for WaitFuture to ensure that Waiter / WaiterInner's state is good even when WaitFuture is cancelled.

impl<'a> Drop for WaiterFuture<'a> {
	fn drop(&mut self) {
        let mut raw_waker = self.waiter.raw_waker.lock();
        if let WaiterState::Waiting = self.waiter.state() {
			*raw_waker = None;
			self.waiter.set_state(WaiterState::Idle);
		}
	}
}

Implement Timeout<F: Future>

Needs to be added :)

@ShuochengWang
Copy link
Contributor

ShuochengWang commented Jul 22, 2021

Implement Timeout<F: Future>

Timeout is a decorator future that can wrap any future to make a new future that completes until the internal future completes or the timeout expires.

Timeout have two fields:

  • future: the future that warpped.
  • timer: the timer, will expire in certain duration.

Notes that, Timeout should be pinned. We can use pin_project crate to implement it easily.

pin_project! {
    pub struct Timeout<F> {
        #[pin]
        future: F,
        #[pin]
        timer: Timer,
    }
}

impl<F> Timeout<F> {
    pub fn new(future: F, timer: Timer) -> Timeout<T> {
        Timeout { future, timer}
    }
}

Since Timeout is a Future, we need impl Future trait:

impl<F: Future> Future for Timeout<F>
{
    type Output = Result<F::Output>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        if let Poll::Ready(output) = this.future.poll(cx) {
            return Poll::Ready(Ok(output));
        }

        match this.timer.poll(cx) {
            Poll::Ready(_) => Poll::Ready(Err("The timer expired")),
            Poll::Pending => Poll::Pending
        }
    }
}

If the timer expired before its future, Timeout will be waked up to poll() again. In this poll(), If the future is still not ready, Timeout will return Ready(Err(()))

@ShuochengWang
Copy link
Contributor

ShuochengWang commented Jul 22, 2021

Integrate timeout support for async_rt crate

To support timeout fully in async_rt crate, we need:

  • Time Expression: no_std. e.g., Instant, Duration. Instant::now() is supported through the interface of Time Acquisition.
  • Time Acquisition: Get time through Linux vDSO. If faild, fallback to OCALL.
  • Timer: no_std. See below for details.
  • Timeout: no_std. See above for details.
  • Waiter: no_std. We already have.

Users use waiter interface. The waiter is implemented by Timeout, and the Timeout is implemented by Timer.
Time Expression + Time Acquisition + Timer = The underlying timeout implementation.

Time Expression

  • Solution 1: We can use time crate. Althrough this crate support no_std, Instant struct needs std. We need do some modification to adapt our vdso-time crate.
  • Solution 2: We implement it ourselves, since we don't need much functionality.

Time Acquisition

We already have vdso-time crate, we just need to integrate fallback to vdso-time crate.

Timer: The key to implementing Timeout

Each timer has a duration or instant, when the timer expired, we need be notified and trigger some things (optional).

Timer model

According to the Hashed and Hierarchical Timing Wheels paper, The timers model is composed of two user-facing operations, start and stop, and two internal operations: per-tick bookkeeping and expiry processing.

  • start: start a timer.
  • stop: stop and remove a timer.
  • per-tick bookkeeping: happens on every tick of the timer clock. maintains all timer states.
  • expiry processing: the timer is expired and trigger some things.

To implement timer efficiently, we consider following schemes:

scheme 1: io-uring

When start a timer, we send a timeout request to io-uring submission queue. When the request is completed, we will be notified and we can trigger some things.

Drawback: Two much pressure for io-uring. The latency can be high.

scheme 2: unordered Timer List

Keep an unordered list of timers and track the remaining time for each. Per-tick bookkeeping will maintain all timers' remaining time.

start is O(1), stop is O(1), per-tick bookkeeping is O(n).

scheme 3: ordered Timer List

Keep a list of timers as in scheme 2, but record the absolute expiry time (not remaining time) and keep the timer list ordered by this expiry time (with the timers closest to expiry at the head of the list).

start is O(n), stop is O(1), per-tick bookkeeping is O(1).

scheme 4: Timer tree

similar to scheme 3, but use priority queue or ordered tree.

start is O(log(n)), stop is O(log(n)), per-tick bookkeeping is O(1).

scheme 5: Simple Timing Wheels

image

When all timers have a maximum period of no more than MaxInterval, we can construct a circular buffer with MaxInterval slots (each representing one tick). The current time is represented by an index into the buffer. To insert a timer that expires j time units in the future, we move j slots around the ring and add the timer to a list of timers held in that slot. Every tick, the current time index moves one slot around the ring and does expiry processing on all timers held in the new slot.

Start, stop, and per-tick bookkeeping are all O(1).

scheme 6: Hashing Wheel with Ordered Timer Lists

image

If MaxInterval is comparatively large, simple timing wheels can use a lot of memory.
Instead of using one slot per time unit, we could use a form of hashing instead. Construct a circular buffer with a fixed number of slots and have the current time index advance one position in the ring on a tick as before. To insert a timer that expires j time units in the future, compute a slot delta s = j % num-buckets . Insert the timer s slots around the ring with its time of expiry. Since there may be many timers in any given slot, we maintain an ordered list of timers for each slot.

start / stop: average O(1), worst O(n).
Per-tick bookkeeping is O(1).

scheme 7: Hashing Wheel with Unordered Timer Lists

This is a variant on scheme 6 where instead of storing absolute time of expiry we store a count of how many times around the ring each timer is in the future. To insert a timer that expires j time units in the future, compute a counter value c = j / num-buckets and a slot delta s = j % num-buckets. Insert the timer s slots around the ring with its counter value c. Keep the timers in an unordered list in each slot.

start / stop: O(1).
Per-tick bookkeeping: average O(1), worst O(n).

scheme 8: Hierarchical Timing Wheels

image

Another way to deal with the memory issues caused by the simple timing wheel approach is to use multiple timing wheels in a hierarchy. Suppose we want to store timers with second granularity, that can be set for up to 100 days in the future. We might construct four wheels:

  • A days wheel with 100 slots
  • An hours wheel with 24 slots
  • A minutes wheel with 60 slots
  • A seconds wheel with 60 slots

This is a total of 244 slots to address a total of 8.64 million possible timer values. Every time we make one complete revolution in a wheel, we advance the next biggest wheel by one slot (the paper describes a slight variation with minute, hour, and day ticking clocks, but the effect is the same). For example, when the seconds wheel has rotated back to index ‘0’ we move the index pointer in the minutes wheel round by one position. We then take all the timers in that slot on the minutes wheel (which are now due to expire within the next 60 seconds) and insert them into their correct positions in the seconds wheel. Expiry time processing in the seconds wheel works exactly as described in scheme 4 (it’s just a simple timer wheel that happens to get replenished on every revolution).
To insert a timer, find the first wheel (from largest units to smallest) for which the timer should expire 1 or more wheel-units into the future. For example, a timer due to expire 11 hours, 15 minutes and 15 seconds into the future would be inserted at slot ‘current-index + 11’ in the hours wheel , storing the remainder of 15 minutes and 15 seconds with the timer. After the hours wheel has advanced by 11 positions, this timer will be removed from that wheel and inserted at ‘current index + 15’ slots round in the minutes wheel, storing the remainder of 15 seconds. When the minutes wheel has subsequently advanced by 15 positions, this timer will be removed from the wheel and placed in the seconds wheel at ‘current index + 15’ slots round in the seconds wheel. 15 seconds later, the timer will expire!

Prefer scheme: Hierarchical Timing Wheels

Timer Wheels:

multi-level, e.g. six level.

  • 1ms wheel, 64 slots (0 - 63ms)
  • 64ms wheel, 64 slots (64ms - 64*64ms)
  • 64*64ms wheel, 64 slots.
  • 64*64*64ms wheel, 64 slots.
  • 64*64*64*64ms wheel, 64 slots.
  • 64*64*64*64*64ms wheel, 64 slots.

Each level has a current index. All levels share a latest absolute time (The time of latest per-tick bookkeeping).

start a timer

Store the expiry duration to the timer, and insert the timer to the corresbonding slot.
Since we know the time of latest per-tick bookkeeping, the current index and the expiry duration, we can find the slot without get real time.

per-tick bookkeeping

Get the real time, update the current index and the latest absolute time.
If meet a slot with timers during moving index, move these timer to lower level. If this slot is in the lowest level, stop these timers and do expiry processing.

stop and expiry processing

wake up the timer's waker and remove the timer.

When to tick ?

The wheels should be guarded by lock. Before each schedule, we try to get the lock. If we get the lock, then tick once, else we just do scheduling.

Optimization to tick ?

Maybe there are no timer near the time in the wheels, in this case, the tick is not efficient, since each tick need get real time through vDSO or OCALL.

We can have a flag named has_near_timer. If the flag is true, we need tick before scheduling. If the flag is false, we don't tick.
In default, the flag is false.
When start a timer, check the expiry duration.

  • If the timer belong to the lowest level, set has_near_timer to true.
  • If has_near_timer is false and the timer isn't belong to lowest level, use io-uring to send a timeout request. When this request completed, set has_near_timer to true.

Reference:
paper reading blog: Hashed and Hierarchical Timing Wheels
paper: Hashed and Hierarchical Timing Wheels
kafka's hierarchical timing wheels
tokio's timer

@tatetian
Copy link
Contributor Author

The timeout design looks good. I didn't dive into the Hierarchical Timing Wheels scheme. But having a design backed by a paper looks promising to me. And I am hoping that there is already a Rust implementation for the algorithm. If not, I think we can implement a simplified, good enough version for now and leave the complete implementation in the future.

Could you break down the design into actionable tasks?

@ShuochengWang
Copy link
Contributor

ShuochengWang commented Jul 23, 2021

actionable tasks

  • time expression.
    Althrough there are no_std time crate, e.g., time, the Instant struct is dependent to std. If we want to integrate our vdso-time crate, we need to modify these crate.
    I think the better way is to implement it ourselves. We only need to implement Instant struct and integrate it with vdso-time crate.
    It's almost done.

  • time acquisition.
    We already have vdso-time crate, all we need to do is to support fallback in vdso-time crate.

  • timer wheels.
    related crates:

    • wheel_timer: Simple hashed wheel timer with bounded interval. Hashed timer only, too simple.
    • pendulum-rs: Data structures and runtimes for timer management. Hashed timer only, more mature.
    • ferris: A hierarchical timer wheel in Rust. Too simple.
    • rust-hash-wheel-timer: a low-level event timer implementation based on hierarchical hash wheels. More mature.
    • tokio/time/driver/wheel: timer wheel part of tokio crate. hierarchical timer wheel, complexity, can not use directly.

    We can implement a simplified version for now.

  • integrate timer wheels to scheduler.

  • timeout.

  • new interface of waiter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants