Skip to content

Commit

Permalink
Only lock once to wake a task
Browse files Browse the repository at this point in the history
  • Loading branch information
bugadani committed Dec 13, 2024
1 parent 2474fbf commit 7500f0c
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 40 deletions.
20 changes: 11 additions & 9 deletions embassy-executor/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,11 @@ impl SyncExecutor {
/// - `task` must be set up to run in this executor.
/// - `task` must NOT be already enqueued (in this executor or another one).
#[inline(always)]
unsafe fn enqueue(&self, task: TaskRef) {
unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
#[cfg(feature = "trace")]
trace::task_ready_begin(self, &task);

if self.run_queue.enqueue(task) {
if self.run_queue.enqueue(task, l) {
self.pender.pend();
}
}
Expand All @@ -378,7 +378,9 @@ impl SyncExecutor {
#[cfg(feature = "trace")]
trace::task_new(self, &task);

self.enqueue(task);
state::locked(|l| {
self.enqueue(task, l);
})
}

/// # Safety
Expand Down Expand Up @@ -521,25 +523,25 @@ impl Executor {
/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
pub fn wake_task(task: TaskRef) {
let header = task.header();
if header.state.run_enqueue() {
header.state.run_enqueue(|l| {
// We have just marked the task as scheduled, so enqueue it.
unsafe {
let executor = header.executor.get().unwrap_unchecked();
executor.enqueue(task);
executor.enqueue(task, l);
}
}
});
}

/// Wake a task by `TaskRef` without calling pend.
///
/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
pub fn wake_task_no_pend(task: TaskRef) {
let header = task.header();
if header.state.run_enqueue() {
header.state.run_enqueue(|l| {
// We have just marked the task as scheduled, so enqueue it.
unsafe {
let executor = header.executor.get().unwrap_unchecked();
executor.run_queue.enqueue(task);
executor.run_queue.enqueue(task, l);
}
}
});
}
2 changes: 1 addition & 1 deletion embassy-executor/src/raw/run_queue_atomics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl RunQueue {
///
/// `item` must NOT be already enqueued in any queue.
#[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
let mut was_empty = false;

self.head
Expand Down
10 changes: 4 additions & 6 deletions embassy-executor/src/raw/run_queue_critical_section.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ impl RunQueue {
///
/// `item` must NOT be already enqueued in any queue.
#[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
critical_section::with(|cs| {
let prev = self.head.borrow(cs).replace(Some(task));
task.header().run_queue_item.next.borrow(cs).set(prev);
pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool {
let prev = self.head.borrow(cs).replace(Some(task));
task.header().run_queue_item.next.borrow(cs).set(prev);

prev.is_none()
})
prev.is_none()
}

/// Empty the queue, then call `on_task` for each task that was in the queue.
Expand Down
20 changes: 17 additions & 3 deletions embassy-executor/src/raw/state_atomics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ use core::sync::atomic::{AtomicU32, Ordering};

use super::timer_queue::TimerEnqueueOperation;

pub(crate) struct Token(());

/// Creates a token and passes it to the closure.
///
/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
pub(crate) fn locked(f: impl FnOnce(Token)) {
f(Token(()));
}

/// Task is spawned (has a future)
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
Expand Down Expand Up @@ -34,10 +43,12 @@ impl State {
self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
}

/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
/// function if the task was successfully marked.
#[inline(always)]
pub fn run_enqueue(&self) -> bool {
self.state
pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
if self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
// If already scheduled, or if not started,
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
Expand All @@ -48,6 +59,9 @@ impl State {
}
})
.is_ok()
{
locked(f);
}
}

/// Unmark the task as run-queued. Return whether the task is spawned.
Expand Down
19 changes: 15 additions & 4 deletions embassy-executor/src/raw/state_atomics_arm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};

use super::timer_queue::TimerEnqueueOperation;

pub(crate) struct Token(());

/// Creates a token and passes it to the closure.
///
/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
pub(crate) fn locked(f: impl FnOnce(Token)) {
f(Token(()));
}

// Must be kept in sync with the layout of `State`!
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8;
Expand Down Expand Up @@ -57,24 +66,26 @@ impl State {
self.spawned.store(false, Ordering::Relaxed);
}

/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
/// function if the task was successfully marked.
#[inline(always)]
pub fn run_enqueue(&self) -> bool {
pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
unsafe {
loop {
let state: u32;
asm!("ldrex {}, [{}]", out(reg) state, in(reg) self, options(nostack));

if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
asm!("clrex", options(nomem, nostack));
return false;
return;
}

let outcome: usize;
let new_state = state | STATE_RUN_QUEUED;
asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack));
if outcome == 0 {
return true;
locked(f);
return;
}
}
}
Expand Down
42 changes: 25 additions & 17 deletions embassy-executor/src/raw/state_critical_section.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::cell::Cell;

use critical_section::Mutex;
pub(crate) use critical_section::{with as locked, CriticalSection as Token};
use critical_section::{CriticalSection, Mutex};

use super::timer_queue::TimerEnqueueOperation;

Expand All @@ -23,13 +24,15 @@ impl State {
}

fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R {
critical_section::with(|cs| {
let s = self.state.borrow(cs);
let mut val = s.get();
let r = f(&mut val);
s.set(val);
r
})
critical_section::with(|cs| self.update_with_cs(cs, f))
}

fn update_with_cs<R>(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u32) -> R) -> R {
let s = self.state.borrow(cs);
let mut val = s.get();
let r = f(&mut val);
s.set(val);
r
}

/// If task is idle, mark it as spawned + run_queued and return true.
Expand All @@ -51,17 +54,22 @@ impl State {
self.update(|s| *s &= !STATE_SPAWNED);
}

/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
/// function if the task was successfully marked.
#[inline(always)]
pub fn run_enqueue(&self) -> bool {
self.update(|s| {
if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) {
false
} else {
*s |= STATE_RUN_QUEUED;
true
pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
critical_section::with(|cs| {
if self.update_with_cs(cs, |s| {
if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) {
false
} else {
*s |= STATE_RUN_QUEUED;
true
}
}) {
f(cs);
}
})
});
}

/// Unmark the task as run-queued. Return whether the task is spawned.
Expand Down

0 comments on commit 7500f0c

Please sign in to comment.