Skip to content

Commit

Permalink
Merge pull request #3656 from bugadani/timer-queue-special
Browse files Browse the repository at this point in the history
Remove special handling of integrated timer queues and items
  • Loading branch information
Dirbaio authored Dec 17, 2024
2 parents 9cf037b + c3c571e commit c504ae8
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 187 deletions.
1 change: 1 addition & 0 deletions embassy-executor/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- embassy-executor no longer provides an `embassy-time-queue-driver` implementation
- Added `TaskRef::executor` to obtain a reference to a task's executor
- integrated-timers are no longer processed when polling the executor.
- Added the option to store data in timer queue items

## 0.6.3 - 2024-11-12

Expand Down
16 changes: 16 additions & 0 deletions embassy-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ trace = []
## Enable support for rtos-trace framework
rtos-trace = ["dep:rtos-trace", "trace", "dep:embassy-time-driver"]

#! ### Timer Item Payload Size
#! Sets the size of the payload for timer items, allowing integrated timer implementors to store
#! additional data in the timer item. The payload field will be aligned to this value as well.
#! If these features are not defined, the timer item will contain no payload field.

_timer-item-payload = [] # A size was picked

## 1 bytes
timer-item-payload-size-1 = ["_timer-item-payload"]
## 2 bytes
timer-item-payload-size-2 = ["_timer-item-payload"]
## 4 bytes
timer-item-payload-size-4 = ["_timer-item-payload"]
## 8 bytes
timer-item-payload-size-8 = ["_timer-item-payload"]

#! ### Task Arena Size
#! Sets the [task arena](#task-arena) size. Necessary if you’re not using `nightly`.
#!
Expand Down
45 changes: 0 additions & 45 deletions embassy-executor/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,6 @@ impl TaskRef {
&self.header().timer_queue_item
}

/// Mark the task as timer-queued. Return whether it should be actually enqueued
/// using `_embassy_time_schedule_wake`.
///
/// Entering this state prevents the task from being respawned while in a timer queue.
///
/// Safety:
///
/// This functions should only be called by the timer queue driver, before
/// enqueueing the timer item.
pub unsafe fn timer_enqueue(&self) -> timer_queue::TimerEnqueueOperation {
self.header().state.timer_enqueue()
}

/// Unmark the task as timer-queued.
///
/// Safety:
///
/// This functions should only be called by the timer queue implementation, after the task has
/// been removed from the timer queue.
pub unsafe fn timer_dequeue(&self) {
self.header().state.timer_dequeue()
}

/// The returned pointer is valid for the entire TaskStorage.
pub(crate) fn as_ptr(self) -> *const TaskHeader {
self.ptr.as_ptr()
Expand Down Expand Up @@ -195,25 +172,7 @@ impl<F: Future + 'static> TaskStorage<F> {
match future.poll(&mut cx) {
Poll::Ready(_) => {
this.future.drop_in_place();

// Mark this task to be timer queued.
// We're splitting the enqueue in two parts, so that we can change task state
// to something that prevent re-queueing.
let op = this.raw.state.timer_enqueue();

// Now mark the task as not spawned, so that
// - it can be spawned again once it has been removed from the timer queue
// - it can not be timer-queued again
// We must do this before scheduling the wake, to prevent the task from being
// dequeued by the time driver while it's still SPAWNED.
this.raw.state.despawn();

// Now let's finish enqueueing. While we shouldn't get an `Ignore` here, it's
// better to be safe.
if op == timer_queue::TimerEnqueueOperation::Enqueue {
// Schedule the task in the past, so it gets dequeued ASAP.
unsafe { _embassy_time_schedule_wake(0, &waker) }
}
}
Poll::Pending => {}
}
Expand All @@ -232,10 +191,6 @@ impl<F: Future + 'static> TaskStorage<F> {
}
}

extern "Rust" {
fn _embassy_time_schedule_wake(at: u64, waker: &core::task::Waker);
}

/// An uninitialized [`TaskStorage`].
pub struct AvailableTask<F: Future + 'static> {
task: &'static TaskStorage<F>,
Expand Down
32 changes: 0 additions & 32 deletions embassy-executor/src/raw/state_atomics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use core::sync::atomic::{AtomicU32, Ordering};

use super::timer_queue::TimerEnqueueOperation;

#[derive(Clone, Copy)]
pub(crate) struct Token(());

Expand All @@ -16,8 +14,6 @@ pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;

pub(crate) struct State {
state: AtomicU32,
Expand Down Expand Up @@ -71,32 +67,4 @@ impl State {
let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel);
state & STATE_SPAWNED != 0
}

/// Mark the task as timer-queued. Return whether it can be enqueued.
#[inline(always)]
pub fn timer_enqueue(&self) -> TimerEnqueueOperation {
if self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
// If not started, ignore it
if state & STATE_SPAWNED == 0 {
None
} else {
// Mark it as enqueued
Some(state | STATE_TIMER_QUEUED)
}
})
.is_ok()
{
TimerEnqueueOperation::Enqueue
} else {
TimerEnqueueOperation::Ignore
}
}

/// Unmark the task as timer-queued.
#[inline(always)]
pub fn timer_dequeue(&self) {
self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::Relaxed);
}
}
36 changes: 2 additions & 34 deletions embassy-executor/src/raw/state_atomics_arm.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use core::arch::asm;
use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};

use super::timer_queue::TimerEnqueueOperation;

#[derive(Clone, Copy)]
pub(crate) struct Token(());

Expand All @@ -16,26 +14,24 @@ pub(crate) fn locked<R>(f: impl FnOnce(Token) -> R) -> R {
// Must be kept in sync with the layout of `State`!
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8;
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 16;

#[repr(C, align(4))]
pub(crate) struct State {
/// Task is spawned (has a future)
spawned: AtomicBool,
/// Task is in the executor run queue
run_queued: AtomicBool,
/// Task is in the executor timer queue
timer_queued: AtomicBool,
pad: AtomicBool,
pad2: AtomicBool,
}

impl State {
pub const fn new() -> State {
Self {
spawned: AtomicBool::new(false),
run_queued: AtomicBool::new(false),
timer_queued: AtomicBool::new(false),
pad: AtomicBool::new(false),
pad2: AtomicBool::new(false),
}
}

Expand Down Expand Up @@ -101,32 +97,4 @@ impl State {
self.run_queued.store(false, Ordering::Relaxed);
r
}

/// Mark the task as timer-queued. Return whether it can be enqueued.
#[inline(always)]
pub fn timer_enqueue(&self) -> TimerEnqueueOperation {
if self
.as_u32()
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
// If not started, ignore it
if state & STATE_SPAWNED == 0 {
None
} else {
// Mark it as enqueued
Some(state | STATE_TIMER_QUEUED)
}
})
.is_ok()
{
TimerEnqueueOperation::Enqueue
} else {
TimerEnqueueOperation::Ignore
}
}

/// Unmark the task as timer-queued.
#[inline(always)]
pub fn timer_dequeue(&self) {
self.timer_queued.store(false, Ordering::Relaxed);
}
}
25 changes: 0 additions & 25 deletions embassy-executor/src/raw/state_critical_section.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@ use core::cell::Cell;
pub(crate) use critical_section::{with as locked, CriticalSection as Token};
use critical_section::{CriticalSection, Mutex};

use super::timer_queue::TimerEnqueueOperation;

/// Task is spawned (has a future)
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1;
/// Task is in the executor timer queue
pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2;

pub(crate) struct State {
state: Mutex<Cell<u32>>,
Expand Down Expand Up @@ -81,25 +77,4 @@ impl State {
ok
})
}

/// Mark the task as timer-queued. Return whether it can be enqueued.
#[inline(always)]
pub fn timer_enqueue(&self) -> TimerEnqueueOperation {
self.update(|s| {
// FIXME: we need to split SPAWNED into two phases, to prevent enqueueing a task that is
// just being spawned, because its executor pointer may still be changing.
if *s & STATE_SPAWNED == STATE_SPAWNED {
*s |= STATE_TIMER_QUEUED;
TimerEnqueueOperation::Enqueue
} else {
TimerEnqueueOperation::Ignore
}
})
}

/// Unmark the task as timer-queued.
#[inline(always)]
pub fn timer_dequeue(&self) {
self.update(|s| *s &= !STATE_TIMER_QUEUED);
}
}
56 changes: 45 additions & 11 deletions embassy-executor/src/raw/timer_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,45 @@ use core::cell::Cell;

use super::TaskRef;

#[cfg(feature = "_timer-item-payload")]
macro_rules! define_opaque {
($size:tt) => {
/// An opaque data type.
#[repr(align($size))]
pub struct OpaqueData {
data: [u8; $size],
}

impl OpaqueData {
const fn new() -> Self {
Self { data: [0; $size] }
}

/// Access the data as a reference to a type `T`.
///
/// Safety:
///
/// The caller must ensure that the size of the type `T` is less than, or equal to
/// the size of the payload, and must ensure that the alignment of the type `T` is
/// less than, or equal to the alignment of the payload.
///
/// The type must be valid when zero-initialized.
pub unsafe fn as_ref<T>(&self) -> &T {
&*(self.data.as_ptr() as *const T)
}
}
};
}

#[cfg(feature = "timer-item-payload-size-1")]
define_opaque!(1);
#[cfg(feature = "timer-item-payload-size-2")]
define_opaque!(2);
#[cfg(feature = "timer-item-payload-size-4")]
define_opaque!(4);
#[cfg(feature = "timer-item-payload-size-8")]
define_opaque!(8);

/// An item in the timer queue.
pub struct TimerQueueItem {
/// The next item in the queue.
Expand All @@ -14,6 +53,10 @@ pub struct TimerQueueItem {

/// The time at which this item expires.
pub expires_at: Cell<u64>,

/// Some implementation-defined, zero-initialized piece of data.
#[cfg(feature = "_timer-item-payload")]
pub payload: OpaqueData,
}

unsafe impl Sync for TimerQueueItem {}
Expand All @@ -23,17 +66,8 @@ impl TimerQueueItem {
Self {
next: Cell::new(None),
expires_at: Cell::new(0),
#[cfg(feature = "_timer-item-payload")]
payload: OpaqueData::new(),
}
}
}

/// The operation to perform after `timer_enqueue` is called.
#[derive(Debug, Copy, Clone, PartialEq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
#[must_use]
pub enum TimerEnqueueOperation {
/// Enqueue the task (or update its expiration time).
Enqueue,
/// The task must not be enqueued in the timer queue.
Ignore,
}
4 changes: 0 additions & 4 deletions embassy-executor/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,3 @@ fn executor_task_cfg_args() {
let (_, _, _) = (a, b, c);
}
}

// We need this for the test to compile, even though we don't want to use timers at the moment.
#[no_mangle]
fn _embassy_time_schedule_wake(_at: u64, _waker: &core::task::Waker) {}
9 changes: 9 additions & 0 deletions embassy-time-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
//!
//! Then, you'll need to adapt the `schedule_wake` method to use this queue.
//!
//! Note that if you are using multiple queues, you will need to ensure that a single timer
//! queue item is only ever enqueued into a single queue at a time.
//!
//! ```ignore
//! use core::cell::RefCell;
//! use core::task::Waker;
Expand Down Expand Up @@ -131,13 +134,19 @@ pub trait Driver: Send + Sync + 'static {

extern "Rust" {
fn _embassy_time_now() -> u64;
fn _embassy_time_schedule_wake(at: u64, waker: &Waker);
}

/// See [`Driver::now`]
pub fn now() -> u64 {
unsafe { _embassy_time_now() }
}

/// Schedule the given waker to be woken at `at`.
pub fn schedule_wake(at: u64, waker: &Waker) {
unsafe { _embassy_time_schedule_wake(at, waker) }
}

/// Set the time Driver implementation.
///
/// See the module documentation for an example.
Expand Down
Loading

0 comments on commit c504ae8

Please sign in to comment.