Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only lock once to wake a task #3644

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions embassy-executor/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,11 @@ impl SyncExecutor {
/// - `task` must be set up to run in this executor.
/// - `task` must NOT be already enqueued (in this executor or another one).
#[inline(always)]
unsafe fn enqueue(&self, task: TaskRef) {
unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
#[cfg(feature = "trace")]
trace::task_ready_begin(self, &task);

if self.run_queue.enqueue(task) {
if self.run_queue.enqueue(task, l) {
self.pender.pend();
}
}
Expand All @@ -401,7 +401,9 @@ impl SyncExecutor {
#[cfg(feature = "trace")]
trace::task_new(self, &task);

self.enqueue(task);
state::locked(|l| {
self.enqueue(task, l);
})
}

/// # Safety
Expand Down Expand Up @@ -544,25 +546,25 @@ impl Executor {
/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
pub fn wake_task(task: TaskRef) {
let header = task.header();
if header.state.run_enqueue() {
header.state.run_enqueue(|l| {
// We have just marked the task as scheduled, so enqueue it.
unsafe {
let executor = header.executor.get().unwrap_unchecked();
executor.enqueue(task);
executor.enqueue(task, l);
}
}
});
}

/// Wake a task by `TaskRef` without calling pend.
///
/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
pub fn wake_task_no_pend(task: TaskRef) {
let header = task.header();
if header.state.run_enqueue() {
header.state.run_enqueue(|l| {
// We have just marked the task as scheduled, so enqueue it.
unsafe {
let executor = header.executor.get().unwrap_unchecked();
executor.run_queue.enqueue(task);
executor.run_queue.enqueue(task, l);
}
}
});
}
2 changes: 1 addition & 1 deletion embassy-executor/src/raw/run_queue_atomics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl RunQueue {
///
/// `item` must NOT be already enqueued in any queue.
#[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
pub(crate) unsafe fn enqueue(&self, task: TaskRef, _: super::state::Token) -> bool {
let mut was_empty = false;

self.head
Expand Down
10 changes: 4 additions & 6 deletions embassy-executor/src/raw/run_queue_critical_section.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ impl RunQueue {
///
/// `item` must NOT be already enqueued in any queue.
#[inline(always)]
pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool {
critical_section::with(|cs| {
let prev = self.head.borrow(cs).replace(Some(task));
task.header().run_queue_item.next.borrow(cs).set(prev);
pub(crate) unsafe fn enqueue(&self, task: TaskRef, cs: CriticalSection<'_>) -> bool {
let prev = self.head.borrow(cs).replace(Some(task));
task.header().run_queue_item.next.borrow(cs).set(prev);

prev.is_none()
})
prev.is_none()
}

/// Empty the queue, then call `on_task` for each task that was in the queue.
Expand Down
20 changes: 17 additions & 3 deletions embassy-executor/src/raw/state_atomics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ use core::sync::atomic::{AtomicU32, Ordering};

use super::timer_queue::TimerEnqueueOperation;

pub(crate) struct Token(());

/// Creates a token and passes it to the closure.
///
/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
pub(crate) fn locked(f: impl FnOnce(Token)) {
f(Token(()));
}

/// Task is spawned (has a future)
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
/// Task is in the executor run queue
Expand Down Expand Up @@ -34,10 +43,12 @@ impl State {
self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel);
}

/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
/// function if the task was successfully marked.
#[inline(always)]
pub fn run_enqueue(&self) -> bool {
self.state
pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
if self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| {
// If already scheduled, or if not started,
if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
Expand All @@ -48,6 +59,9 @@ impl State {
}
})
.is_ok()
{
locked(f);
}
}

/// Unmark the task as run-queued. Return whether the task is spawned.
Expand Down
19 changes: 15 additions & 4 deletions embassy-executor/src/raw/state_atomics_arm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU32, Ordering};

use super::timer_queue::TimerEnqueueOperation;

pub(crate) struct Token(());

/// Creates a token and passes it to the closure.
///
/// This is a no-op replacement for `CriticalSection::with` because we don't need any locking.
pub(crate) fn locked(f: impl FnOnce(Token)) {
f(Token(()));
}

// Must be kept in sync with the layout of `State`!
pub(crate) const STATE_SPAWNED: u32 = 1 << 0;
pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 8;
Expand Down Expand Up @@ -57,24 +66,26 @@ impl State {
self.spawned.store(false, Ordering::Relaxed);
}

/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
/// function if the task was successfully marked.
#[inline(always)]
pub fn run_enqueue(&self) -> bool {
pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
unsafe {
loop {
let state: u32;
asm!("ldrex {}, [{}]", out(reg) state, in(reg) self, options(nostack));

if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) {
asm!("clrex", options(nomem, nostack));
return false;
return;
}

let outcome: usize;
let new_state = state | STATE_RUN_QUEUED;
asm!("strex {}, {}, [{}]", out(reg) outcome, in(reg) new_state, in(reg) self, options(nostack));
if outcome == 0 {
return true;
locked(f);
return;
}
}
}
Expand Down
42 changes: 25 additions & 17 deletions embassy-executor/src/raw/state_critical_section.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::cell::Cell;

use critical_section::Mutex;
pub(crate) use critical_section::{with as locked, CriticalSection as Token};
use critical_section::{CriticalSection, Mutex};

use super::timer_queue::TimerEnqueueOperation;

Expand All @@ -23,13 +24,15 @@ impl State {
}

fn update<R>(&self, f: impl FnOnce(&mut u32) -> R) -> R {
critical_section::with(|cs| {
let s = self.state.borrow(cs);
let mut val = s.get();
let r = f(&mut val);
s.set(val);
r
})
critical_section::with(|cs| self.update_with_cs(cs, f))
}

fn update_with_cs<R>(&self, cs: CriticalSection<'_>, f: impl FnOnce(&mut u32) -> R) -> R {
let s = self.state.borrow(cs);
let mut val = s.get();
let r = f(&mut val);
s.set(val);
r
}

/// If task is idle, mark it as spawned + run_queued and return true.
Expand All @@ -51,17 +54,22 @@ impl State {
self.update(|s| *s &= !STATE_SPAWNED);
}

/// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success.
/// Mark the task as run-queued if it's spawned and isn't already run-queued. Run the given
/// function if the task was successfully marked.
#[inline(always)]
pub fn run_enqueue(&self) -> bool {
self.update(|s| {
if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) {
false
} else {
*s |= STATE_RUN_QUEUED;
true
pub fn run_enqueue(&self, f: impl FnOnce(Token)) {
critical_section::with(|cs| {
if self.update_with_cs(cs, |s| {
if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) {
false
} else {
*s |= STATE_RUN_QUEUED;
true
}
}) {
f(cs);
}
})
});
}

/// Unmark the task as run-queued. Return whether the task is spawned.
Expand Down
Loading