Skip to content

Commit

Permalink
Merge pull request #277 from mahkoh/jorth/async-text
Browse files Browse the repository at this point in the history
text: render text asynchronously
  • Loading branch information
mahkoh authored Sep 30, 2024
2 parents f004afd + 12f358c commit 6ec2e9a
Show file tree
Hide file tree
Showing 30 changed files with 1,274 additions and 568 deletions.
30 changes: 29 additions & 1 deletion src/async_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct AsyncEngine {
yield_stash: RefCell<VecDeque<Waker>>,
stopped: Cell<bool>,
now: Cell<Option<Time>>,
#[cfg(feature = "it")]
idle: Cell<Option<Waker>>,
}

impl AsyncEngine {
Expand All @@ -48,6 +50,8 @@ impl AsyncEngine {
yield_stash: Default::default(),
stopped: Cell::new(false),
now: Default::default(),
#[cfg(feature = "it")]
idle: Default::default(),
})
}

Expand Down Expand Up @@ -91,7 +95,15 @@ impl AsyncEngine {
pub fn dispatch(&self) {
let mut stash = self.stash.borrow_mut();
let mut yield_stash = self.yield_stash.borrow_mut();
while self.num_queued.get() > 0 {
loop {
if self.num_queued.get() == 0 {
#[cfg(feature = "it")]
if let Some(idle) = self.idle.take() {
idle.wake();
continue;
}
break;
}
self.now.take();
self.iteration.fetch_add(1);
let mut phase = 0;
Expand All @@ -116,6 +128,22 @@ impl AsyncEngine {
}
}

#[cfg(feature = "it")]
pub async fn idle(&self) {
use std::{future::poll_fn, task::Poll};
let mut register = true;
poll_fn(|ctx| {
if register {
self.idle.set(Some(ctx.waker().clone()));
register = false;
Poll::Pending
} else {
Poll::Ready(())
}
})
.await
}

fn push(&self, runnable: Runnable, phase: Phase) {
self.queues[phase as usize].push(runnable);
self.num_queued.fetch_add(1);
Expand Down
24 changes: 24 additions & 0 deletions src/clientmem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ use {
crate::{
client::Client,
cpu_worker::{AsyncCpuWork, CpuJob, CpuWork, CpuWorker},
gfx_api::{ShmMemory, ShmMemoryBacking},
utils::vec_ext::VecExt,
},
std::{
cell::Cell,
error::Error,
mem::{ManuallyDrop, MaybeUninit},
ops::Deref,
ptr,
rc::Rc,
sync::atomic::{compiler_fence, Ordering},
Expand Down Expand Up @@ -105,6 +108,7 @@ impl ClientMem {
}
}

#[expect(dead_code)]
pub fn fd(&self) -> &Rc<OwnedFd> {
&self.fd
}
Expand All @@ -115,14 +119,17 @@ impl ClientMem {
}

impl ClientMemOffset {
#[expect(dead_code)]
pub fn pool(&self) -> &ClientMem {
&self.mem
}

#[expect(dead_code)]
pub fn offset(&self) -> usize {
self.offset
}

#[expect(dead_code)]
pub fn ptr(&self) -> *const [Cell<u8>] {
self.data
}
Expand Down Expand Up @@ -263,3 +270,20 @@ impl CpuWork for CloseMemWork {
None
}
}

impl ShmMemory for ClientMemOffset {
fn len(&self) -> usize {
self.data.len()
}

fn safe_access(&self) -> ShmMemoryBacking {
match self.mem.sigbus_impossible() {
true => ShmMemoryBacking::Ptr(self.data),
false => ShmMemoryBacking::Fd(self.mem.fd.deref().clone(), self.offset),
}
}

fn access(&self, f: &mut dyn FnMut(&[Cell<u8>])) -> Result<(), Box<dyn Error + Sync + Send>> {
self.access(f).map_err(|e| e.into())
}
}
26 changes: 19 additions & 7 deletions src/compositor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ use {
tasks::{self, idle},
tracy::enable_profiler,
tree::{
container_layout, container_render_data, float_layout, float_titles,
output_render_data, DisplayNode, NodeIds, OutputNode, TearingMode, VrrMode,
WorkspaceNode,
container_layout, container_render_positions, container_render_titles, float_layout,
float_titles, output_render_data, placeholder_render_textures, DisplayNode, NodeIds,
OutputNode, TearingMode, VrrMode, WorkspaceNode,
},
user_session::import_environment,
utils::{
Expand Down Expand Up @@ -180,13 +180,15 @@ fn start_compositor2(
input_device_handlers: Default::default(),
theme: Default::default(),
pending_container_layout: Default::default(),
pending_container_render_data: Default::default(),
pending_container_render_positions: Default::default(),
pending_container_render_title: Default::default(),
pending_output_render_data: Default::default(),
pending_float_layout: Default::default(),
pending_float_titles: Default::default(),
pending_input_popup_positioning: Default::default(),
pending_toplevel_screencasts: Default::default(),
pending_screencast_reallocs_or_reconfigures: Default::default(),
pending_placeholder_render_textures: Default::default(),
dbus: Dbus::new(&engine, &ring, &run_toplevel),
fdcloser: FdCloser::new(),
logger: logger.clone(),
Expand Down Expand Up @@ -374,9 +376,19 @@ fn start_global_event_handlers(
container_layout(state.clone()),
),
eng.spawn2(
"container render",
"container render positions",
Phase::PostLayout,
container_render_data(state.clone()),
container_render_positions(state.clone()),
),
eng.spawn2(
"container titles",
Phase::PostLayout,
container_render_titles(state.clone()),
),
eng.spawn2(
"placeholder textures",
Phase::PostLayout,
placeholder_render_textures(state.clone()),
),
eng.spawn2(
"output render",
Expand Down Expand Up @@ -577,7 +589,7 @@ fn create_dummy_output(state: &Rc<State>) {
jay_workspaces: Default::default(),
may_capture: Cell::new(false),
has_capture: Cell::new(false),
title_texture: Cell::new(None),
title_texture: Default::default(),
attention_requests: Default::default(),
render_highlight: Default::default(),
});
Expand Down
13 changes: 8 additions & 5 deletions src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
output_schedule::map_cursor_hz,
scale::Scale,
state::{ConnectorData, DeviceHandlerData, DrmDevData, OutputData, State},
theme::{Color, ThemeSized, DEFAULT_FONT},
theme::{Color, ThemeSized},
tree::{
move_ws_to_output, ContainerNode, ContainerSplit, FloatNode, Node, NodeVisitorBase,
OutputNode, TearingMode, VrrMode, WsMoveConfig,
Expand Down Expand Up @@ -57,7 +57,7 @@ use {
},
libloading::Library,
log::Level,
std::{cell::Cell, ops::Deref, rc::Rc, time::Duration},
std::{cell::Cell, ops::Deref, rc::Rc, sync::Arc, time::Duration},
thiserror::Error,
uapi::{c, fcntl_dupfd_cloexec, OwnedFd},
};
Expand Down Expand Up @@ -1525,15 +1525,18 @@ impl ConfigProxyHandler {
}

fn handle_reset_font(&self) {
*self.state.theme.font.borrow_mut() = DEFAULT_FONT.to_string();
self.state
.theme
.font
.set(self.state.theme.default_font.clone());
}

fn handle_set_font(&self, font: &str) {
*self.state.theme.font.borrow_mut() = font.to_string();
self.state.theme.font.set(Arc::new(font.to_string()));
}

fn handle_get_font(&self) {
let font = self.state.theme.font.borrow_mut().clone();
let font = self.state.theme.font.get().to_string();
self.respond(Response::GetFont { font });
}

Expand Down
60 changes: 48 additions & 12 deletions src/cpu_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use {
ptr_ext::MutPtrExt, queue::AsyncQueue, stack::Stack,
},
},
parking_lot::Mutex,
parking_lot::{Condvar, Mutex},
std::{
any::Any,
cell::{Cell, RefCell},
Expand Down Expand Up @@ -113,18 +113,25 @@ enum Job {

unsafe impl Send for Job {}

#[derive(Default)]
struct CompletedJobsExchange {
queue: VecDeque<CpuJobId>,
condvar: Option<Arc<Condvar>>,
}

struct CpuWorkerData {
next: CpuJobIds,
jobs_to_enqueue: AsyncQueue<Job>,
new_jobs: Arc<Mutex<VecDeque<Job>>>,
have_new_jobs: Rc<OwnedFd>,
completed_jobs_remote: Arc<Mutex<VecDeque<CpuJobId>>>,
completed_jobs_remote: Arc<Mutex<CompletedJobsExchange>>,
completed_jobs_local: RefCell<VecDeque<CpuJobId>>,
have_completed_jobs: Rc<OwnedFd>,
pending_jobs: CopyHashMap<CpuJobId, Rc<PendingJobData>>,
ring: Rc<IoUring>,
_stop: OwnedFd,
pending_job_data_cache: Stack<Rc<PendingJobData>>,
sync_wake_condvar: Arc<Condvar>,
}

linear_ids!(CpuJobIds, CpuJobId, u64);
Expand Down Expand Up @@ -172,12 +179,16 @@ impl Drop for PendingJob {
self.job_data.state.set(PendingJobState::Abandoned);
data.jobs_to_enqueue.push(Job::Cancel { id });
data.do_equeue_jobs();
let mut buf = 0u64;
while data.pending_jobs.contains(&id) {
if let Err(e) = uapi::read(data.have_completed_jobs.raw(), &mut buf) {
panic!("Could not wait for job completions: {}", ErrorFmt(e));
}
loop {
data.dispatch_completions();
if !data.pending_jobs.contains(&id) {
break;
}
let mut remote = data.completed_jobs_remote.lock();
while remote.queue.is_empty() {
remote.condvar = Some(data.sync_wake_condvar.clone());
data.sync_wake_condvar.wait(&mut remote);
}
}
}
PendingJobState::Abandoned => {}
Expand All @@ -204,7 +215,7 @@ impl CpuWorkerData {

fn dispatch_completions(&self) {
let completions = &mut *self.completed_jobs_local.borrow_mut();
mem::swap(completions, &mut *self.completed_jobs_remote.lock());
mem::swap(completions, &mut self.completed_jobs_remote.lock().queue);
while let Some(id) = completions.pop_front() {
let job_data = self.pending_jobs.remove(&id).unwrap();
let job = job_data.job.take().unwrap();
Expand Down Expand Up @@ -242,7 +253,7 @@ impl CpuWorkerData {
impl CpuWorker {
pub fn new(ring: &Rc<IoUring>, eng: &Rc<AsyncEngine>) -> Result<Self, CpuWorkerError> {
let new_jobs: Arc<Mutex<VecDeque<Job>>> = Default::default();
let completed_jobs: Arc<Mutex<VecDeque<CpuJobId>>> = Default::default();
let completed_jobs: Arc<Mutex<CompletedJobsExchange>> = Default::default();
let (stop_read, stop_write) =
uapi::pipe2(c::O_CLOEXEC).map_err(|e| CpuWorkerError::Pipe(e.into()))?;
let have_new_jobs =
Expand Down Expand Up @@ -281,6 +292,7 @@ impl CpuWorker {
ring: ring.clone(),
_stop: stop_read,
pending_job_data_cache: Default::default(),
sync_wake_condvar: Arc::new(Condvar::new()),
});
Ok(Self {
_completions_listener: eng.spawn(
Expand Down Expand Up @@ -309,11 +321,28 @@ impl CpuWorker {
job_data,
}
}

#[cfg(feature = "it")]
pub fn wait_idle(&self) -> bool {
let was_idle = self.data.pending_jobs.is_empty();
loop {
self.data.dispatch_completions();
if self.data.pending_jobs.is_empty() {
break;
}
let mut remote = self.data.completed_jobs_remote.lock();
while remote.queue.is_empty() {
remote.condvar = Some(self.data.sync_wake_condvar.clone());
self.data.sync_wake_condvar.wait(&mut remote);
}
}
was_idle
}
}

fn work(
new_jobs: Arc<Mutex<VecDeque<Job>>>,
completed_jobs: Arc<Mutex<VecDeque<CpuJobId>>>,
completed_jobs: Arc<Mutex<CompletedJobsExchange>>,
stop: OwnedFd,
have_new_jobs: OwnedFd,
have_completed_jobs: OwnedFd,
Expand Down Expand Up @@ -343,7 +372,7 @@ fn work(
struct Worker {
eng: Rc<AsyncEngine>,
ring: Rc<IoUring>,
completed_jobs: Arc<Mutex<VecDeque<CpuJobId>>>,
completed_jobs: Arc<Mutex<CompletedJobsExchange>>,
have_completed_jobs: OwnedFd,
async_jobs: CopyHashMap<CpuJobId, AsyncJob>,
stopped: Cell<bool>,
Expand Down Expand Up @@ -428,7 +457,14 @@ impl Worker {
}

fn send_completion(&self, id: CpuJobId) {
self.completed_jobs.lock().push_back(id);
let cv = {
let mut exchange = self.completed_jobs.lock();
exchange.queue.push_back(id);
exchange.condvar.take()
};
if let Some(cv) = cv {
cv.notify_all();
}
if let Err(e) = uapi::eventfd_write(self.have_completed_jobs.raw(), 1) {
panic!("Could not signal job completion: {}", ErrorFmt(e));
}
Expand Down
Loading

0 comments on commit 6ec2e9a

Please sign in to comment.