diff --git a/src/allocator.rs b/src/allocator.rs index 6354bdb7..f836a471 100644 --- a/src/allocator.rs +++ b/src/allocator.rs @@ -13,7 +13,7 @@ use { #[derive(Debug, Error)] #[error(transparent)] -pub struct AllocatorError(#[from] pub Box); +pub struct AllocatorError(#[from] pub Box); bitflags! { BufferUsage: u32; diff --git a/src/cli/input.rs b/src/cli/input.rs index b41c2c4b..d48b5624 100644 --- a/src/cli/input.rs +++ b/src/cli/input.rs @@ -353,8 +353,10 @@ impl Input { async fn handle_keymap(&self, input: JayInputId) -> Vec { let data = Rc::new(RefCell::new(Vec::new())); jay_input::Keymap::handle(&self.tc, input, data.clone(), |d, map| { - let mem = Rc::new(ClientMem::new(map.keymap.raw(), map.keymap_len as _, true).unwrap()) - .offset(0); + let mem = Rc::new( + ClientMem::new(&map.keymap, map.keymap_len as _, true, None, None).unwrap(), + ) + .offset(0); mem.read(d.borrow_mut().deref_mut()).unwrap(); }); self.tc.round_trip().await; diff --git a/src/clientmem.rs b/src/clientmem.rs index f85dc5e7..3f435e0c 100644 --- a/src/clientmem.rs +++ b/src/clientmem.rs @@ -1,14 +1,21 @@ use { - crate::utils::vec_ext::VecExt, + crate::{ + client::Client, + cpu_worker::{AsyncCpuWork, CpuJob, CpuWork, CpuWorker}, + utils::vec_ext::VecExt, + }, std::{ cell::Cell, - mem::MaybeUninit, + mem::{ManuallyDrop, MaybeUninit}, ptr, rc::Rc, sync::atomic::{compiler_fence, Ordering}, }, thiserror::Error, - uapi::{c, c::raise}, + uapi::{ + c::{self, raise}, + OwnedFd, + }, }; #[derive(Debug, Error)] @@ -22,27 +29,45 @@ pub enum ClientMemError { } pub struct ClientMem { + fd: ManuallyDrop>, failed: Cell, sigbus_impossible: bool, data: *const [Cell], + cpu: Option>, } #[derive(Clone)] pub struct ClientMemOffset { mem: Rc, + offset: usize, data: *const [Cell], } impl ClientMem { - pub fn new(fd: i32, len: usize, read_only: bool) -> Result { + pub fn new( + fd: &Rc, + len: usize, + read_only: bool, + client: Option<&Client>, + cpu: Option<&Rc>, + ) -> Result { let mut sigbus_impossible = false; - if let Ok(seals) = uapi::fcntl_get_seals(fd) { + if let Ok(seals) = uapi::fcntl_get_seals(fd.raw()) { if seals & c::F_SEAL_SHRINK != 0 { - if let Ok(stat) = uapi::fstat(fd) { + if let Ok(stat) = uapi::fstat(fd.raw()) { sigbus_impossible = stat.st_size as u64 >= len as u64; } } } + if !sigbus_impossible { + if let Some(client) = client { + log::debug!( + "Client {} ({}) has created a shm buffer that might cause SIGBUS", + client.pid_info.comm, + client.id, + ); + } + } let data = if len == 0 { &mut [][..] } else { @@ -51,7 +76,7 @@ impl ClientMem { false => c::PROT_READ | c::PROT_WRITE, }; unsafe { - let data = c::mmap64(ptr::null_mut(), len, prot, c::MAP_SHARED, fd, 0); + let data = c::mmap64(ptr::null_mut(), len, prot, c::MAP_SHARED, fd.raw(), 0); if data == c::MAP_FAILED { return Err(ClientMemError::MmapFailed(uapi::Errno::default().into())); } @@ -59,9 +84,11 @@ impl ClientMem { } }; Ok(Self { + fd: ManuallyDrop::new(fd.clone()), failed: Cell::new(false), sigbus_impossible, data, + cpu: cpu.cloned(), }) } @@ -73,12 +100,33 @@ impl ClientMem { let mem = unsafe { &*self.data }; ClientMemOffset { mem: self.clone(), + offset, data: &mem[offset..], } } + + pub fn fd(&self) -> &Rc { + &self.fd + } + + pub fn sigbus_impossible(&self) -> bool { + self.sigbus_impossible + } } impl ClientMemOffset { + pub fn pool(&self) -> &ClientMem { + &self.mem + } + + pub fn offset(&self) -> usize { + self.offset + } + + pub fn ptr(&self) -> *const [Cell] { + self.data + } + pub fn access]) -> T>(&self, f: F) -> Result { unsafe { if self.mem.sigbus_impossible { @@ -114,8 +162,17 @@ impl ClientMemOffset { impl Drop for ClientMem { fn drop(&mut self) { - unsafe { - c::munmap(self.data as _, self.len()); + let fd = unsafe { ManuallyDrop::take(&mut self.fd) }; + if let Some(cpu) = &self.cpu { + let pending = cpu.submit(Box::new(CloseMemWork { + fd: Rc::try_unwrap(fd).ok(), + data: self.data, + })); + pending.detach(); + } else { + unsafe { + c::munmap(self.data as _, self.len()); + } } } } @@ -178,3 +235,30 @@ pub fn init() -> Result<(), ClientMemError> { } } } + +struct CloseMemWork { + fd: Option, + data: *const [Cell], +} + +unsafe impl Send for CloseMemWork {} + +impl CpuJob for CloseMemWork { + fn work(&mut self) -> &mut dyn CpuWork { + self + } + + fn completed(self: Box) { + // nothing + } +} + +impl CpuWork for CloseMemWork { + fn run(&mut self) -> Option> { + self.fd.take(); + unsafe { + c::munmap(self.data as _, self.data.len()); + } + None + } +} diff --git a/src/compositor.rs b/src/compositor.rs index ad5d5720..1a93f071 100644 --- a/src/compositor.rs +++ b/src/compositor.rs @@ -13,6 +13,7 @@ use { client::{ClientId, Clients}, clientmem::{self, ClientMemError}, config::ConfigProxy, + cpu_worker::{CpuWorker, CpuWorkerError}, damage::{visualize_damage, DamageVisualizer}, dbus::Dbus, ei::ei_client::EiClients, @@ -107,6 +108,8 @@ pub enum CompositorError { WheelError(#[from] WheelError), #[error("Could not create an io-uring")] IoUringError(#[from] IoUringError), + #[error("Could not create cpu worker")] + CpuWorkerError(#[from] CpuWorkerError), } pub const WAYLAND_DISPLAY: &str = "WAYLAND_DISPLAY"; @@ -143,6 +146,7 @@ fn start_compositor2( let node_ids = NodeIds::default(); let scales = RefCounted::default(); scales.add(Scale::from_int(1)); + let cpu_worker = Rc::new(CpuWorker::new(&ring, &engine)?); let state = Rc::new(State { xkb_ctx, backend: CloneCell::new(Rc::new(DummyBackend)), @@ -258,6 +262,7 @@ fn start_compositor2( enable_ei_acceptor: Default::default(), ei_clients: EiClients::new(), slow_ei_clients: Default::default(), + cpu_worker, }); state.tracker.register(ClientId::from_raw(0)); create_dummy_output(&state); diff --git a/src/cpu_worker.rs b/src/cpu_worker.rs new file mode 100644 index 00000000..ec360aa0 --- /dev/null +++ b/src/cpu_worker.rs @@ -0,0 +1,430 @@ +pub mod jobs; +#[cfg(test)] +mod tests; + +use { + crate::{ + async_engine::{AsyncEngine, SpawnedFuture}, + io_uring::IoUring, + utils::{ + buf::TypedBuf, copyhashmap::CopyHashMap, errorfmt::ErrorFmt, oserror::OsError, + ptr_ext::MutPtrExt, queue::AsyncQueue, stack::Stack, + }, + }, + parking_lot::Mutex, + std::{ + any::Any, + cell::{Cell, RefCell}, + collections::VecDeque, + mem, + ptr::NonNull, + rc::Rc, + sync::Arc, + thread, + }, + thiserror::Error, + uapi::{c, OwnedFd}, +}; + +pub trait CpuJob { + fn work(&mut self) -> &mut dyn CpuWork; + fn completed(self: Box); +} + +pub trait CpuWork: Send { + fn run(&mut self) -> Option>; + + fn cancel_async(&mut self, ring: &Rc) { + let _ = ring; + unreachable!(); + } + + fn async_work_done(&mut self, work: Box) { + let _ = work; + unreachable!(); + } +} + +pub trait AsyncCpuWork { + fn run( + self: Box, + eng: &Rc, + ring: &Rc, + completion: WorkCompletion, + ) -> SpawnedFuture; + + fn into_any(self: Box) -> Box; +} + +pub struct WorkCompletion { + worker: Rc, + id: CpuJobId, +} + +pub struct CompletedWork(()); + +impl WorkCompletion { + pub fn complete(self, work: Box) -> CompletedWork { + let job = self.worker.async_jobs.remove(&self.id).unwrap(); + unsafe { + job.work.deref_mut().async_work_done(work); + } + self.worker.send_completion(self.id); + CompletedWork(()) + } +} + +pub struct CpuWorker { + data: Rc, + _completions_listener: SpawnedFuture<()>, + _job_enqueuer: SpawnedFuture<()>, +} + +#[must_use] +pub struct PendingJob { + id: CpuJobId, + thread_data: Rc, + job_data: Rc, +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)] +enum PendingJobState { + #[default] + Waiting, + Abandoned, + Completed, +} + +#[derive(Default)] +struct PendingJobData { + job: Cell>>, + state: Cell, +} + +enum Job { + New { + id: CpuJobId, + work: *mut dyn CpuWork, + }, + Cancel { + id: CpuJobId, + }, +} + +unsafe impl Send for Job {} + +struct CpuWorkerData { + next: CpuJobIds, + jobs_to_enqueue: AsyncQueue, + new_jobs: Arc>>, + have_new_jobs: Rc, + completed_jobs_remote: Arc>>, + completed_jobs_local: RefCell>, + have_completed_jobs: Rc, + pending_jobs: CopyHashMap>, + ring: Rc, + _stop: OwnedFd, + pending_job_data_cache: Stack>, +} + +linear_ids!(CpuJobIds, CpuJobId, u64); + +#[derive(Debug, Error)] +pub enum CpuWorkerError { + #[error("Could not create a pipe")] + Pipe(#[source] OsError), + #[error("Could not create an eventfd")] + EventFd(#[source] OsError), + #[error("Could not dup an eventfd")] + Dup(#[source] OsError), +} + +impl PendingJob { + pub fn detach(self) { + match self.job_data.state.get() { + PendingJobState::Waiting => { + self.job_data.state.set(PendingJobState::Abandoned); + } + PendingJobState::Abandoned => { + unreachable!(); + } + PendingJobState::Completed => {} + } + } +} + +impl Drop for CpuWorker { + fn drop(&mut self) { + self.data.do_equeue_jobs(); + if self.data.pending_jobs.is_not_empty() { + log::warn!("CpuWorker dropped with pending jobs. Completed jobs will not be triggered.") + } + } +} + +impl Drop for PendingJob { + fn drop(&mut self) { + match self.job_data.state.get() { + PendingJobState::Waiting => { + log::warn!("PendingJob dropped before completion. Blocking."); + let data = &self.thread_data; + let id = self.id; + self.job_data.state.set(PendingJobState::Abandoned); + data.jobs_to_enqueue.push(Job::Cancel { id }); + data.do_equeue_jobs(); + let mut buf = 0u64; + while data.pending_jobs.contains(&id) { + if let Err(e) = uapi::read(data.have_completed_jobs.raw(), &mut buf) { + panic!("Could not wait for job completions: {}", ErrorFmt(e)); + } + data.dispatch_completions(); + } + } + PendingJobState::Abandoned => {} + PendingJobState::Completed => { + self.thread_data + .pending_job_data_cache + .push(self.job_data.clone()); + } + } + } +} + +impl CpuWorkerData { + async fn wait_for_completions(self: Rc) { + let mut buf = TypedBuf::::new(); + loop { + if let Err(e) = self.ring.read(&self.have_completed_jobs, buf.buf()).await { + log::error!("Could not wait for job completions: {}", ErrorFmt(e)); + return; + } + self.dispatch_completions(); + } + } + + fn dispatch_completions(&self) { + let completions = &mut *self.completed_jobs_local.borrow_mut(); + mem::swap(completions, &mut *self.completed_jobs_remote.lock()); + while let Some(id) = completions.pop_front() { + let job_data = self.pending_jobs.remove(&id).unwrap(); + let job = job_data.job.take().unwrap(); + let job = unsafe { Box::from_raw(job.as_ptr()) }; + match job_data.state.get() { + PendingJobState::Waiting => { + job_data.state.set(PendingJobState::Completed); + job.completed(); + } + PendingJobState::Abandoned => { + self.pending_job_data_cache.push(job_data); + } + PendingJobState::Completed => { + unreachable!(); + } + } + } + } + + async fn equeue_jobs(self: Rc) { + loop { + self.jobs_to_enqueue.non_empty().await; + self.do_equeue_jobs(); + } + } + + fn do_equeue_jobs(&self) { + self.jobs_to_enqueue.move_to(&mut self.new_jobs.lock()); + if let Err(e) = uapi::eventfd_write(self.have_new_jobs.raw(), 1) { + panic!("Could not signal eventfd: {}", ErrorFmt(e)); + } + } +} + +impl CpuWorker { + pub fn new(ring: &Rc, eng: &Rc) -> Result { + let new_jobs: Arc>> = Default::default(); + let completed_jobs: Arc>> = Default::default(); + let (stop_read, stop_write) = + uapi::pipe2(c::O_CLOEXEC).map_err(|e| CpuWorkerError::Pipe(e.into()))?; + let have_new_jobs = + uapi::eventfd(0, c::EFD_CLOEXEC).map_err(|e| CpuWorkerError::EventFd(e.into()))?; + let have_completed_jobs = + uapi::eventfd(0, c::EFD_CLOEXEC).map_err(|e| CpuWorkerError::EventFd(e.into()))?; + thread::Builder::new() + .name("cpu worker".to_string()) + .spawn({ + let new_jobs = new_jobs.clone(); + let completed_jobs = completed_jobs.clone(); + let have_new_jobs = uapi::fcntl_dupfd_cloexec(have_new_jobs.raw(), 0) + .map_err(|e| CpuWorkerError::Dup(e.into()))?; + let have_completed_jobs = uapi::fcntl_dupfd_cloexec(have_completed_jobs.raw(), 0) + .map_err(|e| CpuWorkerError::Dup(e.into()))?; + move || { + work( + new_jobs, + completed_jobs, + stop_write, + have_new_jobs, + have_completed_jobs, + ) + } + }) + .unwrap(); + let data = Rc::new(CpuWorkerData { + next: Default::default(), + jobs_to_enqueue: Default::default(), + new_jobs, + have_new_jobs: Rc::new(have_new_jobs), + completed_jobs_remote: completed_jobs, + completed_jobs_local: Default::default(), + have_completed_jobs: Rc::new(have_completed_jobs), + pending_jobs: Default::default(), + ring: ring.clone(), + _stop: stop_read, + pending_job_data_cache: Default::default(), + }); + Ok(Self { + _completions_listener: eng.spawn(data.clone().wait_for_completions()), + _job_enqueuer: eng.spawn(data.clone().equeue_jobs()), + data, + }) + } + + pub fn submit(&self, job: Box) -> PendingJob { + let mut job = NonNull::from(Box::leak(job)); + let id = self.data.next.next(); + self.data.jobs_to_enqueue.push(Job::New { + id, + work: unsafe { job.as_mut().work() }, + }); + let job_data = self.data.pending_job_data_cache.pop().unwrap_or_default(); + job_data.job.set(Some(job)); + job_data.state.set(PendingJobState::Waiting); + self.data.pending_jobs.set(id, job_data.clone()); + PendingJob { + id, + thread_data: self.data.clone(), + job_data, + } + } +} + +fn work( + new_jobs: Arc>>, + completed_jobs: Arc>>, + stop: OwnedFd, + have_new_jobs: OwnedFd, + have_completed_jobs: OwnedFd, +) { + let eng = AsyncEngine::new(); + let ring = IoUring::new(&eng, 32).unwrap(); + let worker = Rc::new(Worker { + eng, + ring, + completed_jobs, + have_completed_jobs, + async_jobs: Default::default(), + stopped: Cell::new(false), + }); + let _stop_listener = worker.eng.spawn(worker.clone().handle_stop(stop)); + let _new_job_listener = worker + .eng + .spawn(worker.clone().handle_new_jobs(new_jobs, have_new_jobs)); + if let Err(e) = worker.ring.run() { + panic!("io_uring failed: {}", ErrorFmt(e)); + } +} + +struct Worker { + eng: Rc, + ring: Rc, + completed_jobs: Arc>>, + have_completed_jobs: OwnedFd, + async_jobs: CopyHashMap, + stopped: Cell, +} + +struct AsyncJob { + _future: SpawnedFuture, + work: *mut dyn CpuWork, +} + +impl Worker { + async fn handle_stop(self: Rc, stop: OwnedFd) { + let stop = Rc::new(stop); + if let Err(e) = self.ring.poll(&stop, 0).await { + log::error!( + "Could not wait for stop fd to become readable: {}", + ErrorFmt(e) + ); + } else { + assert!(self.async_jobs.is_empty()); + self.stopped.set(true); + self.ring.stop(); + } + } + + async fn handle_new_jobs( + self: Rc, + jobs_remote: Arc>>, + new_jobs: OwnedFd, + ) { + let mut buf = TypedBuf::::new(); + let new_jobs = Rc::new(new_jobs); + let mut jobs = VecDeque::new(); + loop { + if let Err(e) = self.ring.read(&new_jobs, buf.buf()).await { + if self.stopped.get() { + return; + } + panic!( + "Could not wait for new jobs fd to be signaled: {}", + ErrorFmt(e), + ); + } + mem::swap(&mut jobs, &mut *jobs_remote.lock()); + while let Some(job) = jobs.pop_front() { + self.handle_new_job(job); + } + } + } + + fn handle_new_job(self: &Rc, job: Job) { + match job { + Job::Cancel { id } => { + let mut jobs = self.async_jobs.lock(); + if let Some(job) = jobs.get_mut(&id) { + unsafe { + job.work.deref_mut().cancel_async(&self.ring); + } + } + } + Job::New { id, work } => match unsafe { work.deref_mut() }.run() { + None => { + self.send_completion(id); + return; + } + Some(w) => { + let completion = WorkCompletion { + worker: self.clone(), + id, + }; + let future = w.run(&self.eng, &self.ring, completion); + self.async_jobs.set( + id, + AsyncJob { + _future: future, + work, + }, + ); + } + }, + } + } + + fn send_completion(&self, id: CpuJobId) { + self.completed_jobs.lock().push_back(id); + if let Err(e) = uapi::eventfd_write(self.have_completed_jobs.raw(), 1) { + panic!("Could not signal job completion: {}", ErrorFmt(e)); + } + } +} diff --git a/src/cpu_worker/jobs.rs b/src/cpu_worker/jobs.rs new file mode 100644 index 00000000..63b5b257 --- /dev/null +++ b/src/cpu_worker/jobs.rs @@ -0,0 +1,2 @@ +pub mod img_copy; +pub mod read_write; diff --git a/src/cpu_worker/jobs/img_copy.rs b/src/cpu_worker/jobs/img_copy.rs new file mode 100644 index 00000000..414c1afe --- /dev/null +++ b/src/cpu_worker/jobs/img_copy.rs @@ -0,0 +1,62 @@ +use { + crate::{ + cpu_worker::{AsyncCpuWork, CpuWork}, + rect::Rect, + }, + std::ptr, +}; + +#[expect(clippy::manual_non_exhaustive)] +pub struct ImgCopyWork { + pub src: *mut u8, + pub dst: *mut u8, + pub width: i32, + pub stride: i32, + pub bpp: i32, + pub rects: Vec, + _priv: (), +} + +unsafe impl Send for ImgCopyWork {} + +impl ImgCopyWork { + pub unsafe fn new() -> Self { + Self { + src: ptr::null_mut(), + dst: ptr::null_mut(), + width: 0, + stride: 0, + bpp: 0, + rects: vec![], + _priv: (), + } + } +} + +impl CpuWork for ImgCopyWork { + fn run(&mut self) -> Option> { + for rect in &self.rects { + let mut offset = rect.y1() * self.stride + rect.x1() * self.bpp; + if rect.width() == self.width { + let offset = offset as usize; + let len = rect.height() * self.stride; + unsafe { + ptr::copy_nonoverlapping(self.src.add(offset), self.dst.add(offset), len as _); + } + } else { + let len = rect.width() * self.bpp; + for _ in 0..rect.height() { + unsafe { + ptr::copy_nonoverlapping( + self.src.add(offset as _), + self.dst.add(offset as _), + len as _, + ); + } + offset += self.stride; + } + } + } + None + } +} diff --git a/src/cpu_worker/jobs/read_write.rs b/src/cpu_worker/jobs/read_write.rs new file mode 100644 index 00000000..791ee22f --- /dev/null +++ b/src/cpu_worker/jobs/read_write.rs @@ -0,0 +1,151 @@ +use { + crate::{ + async_engine::{AsyncEngine, SpawnedFuture}, + cpu_worker::{AsyncCpuWork, CompletedWork, CpuWork, WorkCompletion}, + io_uring::{IoUring, IoUringError, IoUringTaskId}, + }, + std::{ + any::Any, + ptr, + rc::Rc, + slice, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering::Relaxed}, + Arc, + }, + }, + thiserror::Error, + uapi::{c, Fd}, +}; + +#[derive(Debug, Error)] +pub enum ReadWriteJobError { + #[error("An io_uring error occurred")] + IoUring(#[source] IoUringError), + #[error("The job was cancelled")] + Cancelled, + #[error("Tried to operate outside the bounds of the file descriptor")] + OutOfBounds, +} + +pub struct ReadWriteWork { + cancel: Arc, + config: Option>, +} + +unsafe impl Send for ReadWriteWork {} + +impl ReadWriteWork { + pub unsafe fn new() -> Self { + let cancel = Arc::new(CancelState::default()); + ReadWriteWork { + cancel: cancel.clone(), + config: Some(Box::new(ReadWriteWorkConfig { + fd: -1, + offset: 0, + ptr: ptr::null_mut(), + len: 0, + write: false, + cancel, + result: None, + })), + } + } + + pub fn config(&mut self) -> &mut ReadWriteWorkConfig { + self.config.as_mut().unwrap() + } +} + +pub struct ReadWriteWorkConfig { + pub fd: c::c_int, + pub offset: usize, + pub ptr: *mut u8, + pub len: usize, + pub write: bool, + pub result: Option>, + cancel: Arc, +} + +#[derive(Default)] +struct CancelState { + cancelled: AtomicBool, + cancel_id: AtomicU64, +} + +impl CpuWork for ReadWriteWork { + fn run(&mut self) -> Option> { + self.cancel.cancelled.store(false, Relaxed); + self.cancel.cancel_id.store(0, Relaxed); + self.config.take().map(|b| b as _) + } + + fn cancel_async(&mut self, ring: &Rc) { + self.cancel.cancelled.store(true, Relaxed); + let id = self.cancel.cancel_id.load(Relaxed); + if id != 0 { + ring.cancel(IoUringTaskId::from_raw(id)); + } + } + + fn async_work_done(&mut self, work: Box) { + let work = work.into_any().downcast().unwrap(); + self.config = Some(work); + } +} + +impl AsyncCpuWork for ReadWriteWorkConfig { + fn run( + mut self: Box, + eng: &Rc, + ring: &Rc, + completion: WorkCompletion, + ) -> SpawnedFuture { + let ring = ring.clone(); + eng.spawn(async move { + let res = loop { + if self.cancel.cancelled.load(Relaxed) { + break Err(ReadWriteJobError::Cancelled); + } + if self.len == 0 { + break Ok(()); + }; + let res = if self.write { + ring.write_no_cancel( + Fd::new(self.fd), + self.offset, + unsafe { slice::from_raw_parts(self.ptr, self.len) }, + None, + |id| self.cancel.cancel_id.store(id.raw(), Relaxed), + ) + .await + } else { + ring.read_no_cancel( + Fd::new(self.fd), + self.offset, + unsafe { slice::from_raw_parts_mut(self.ptr, self.len) }, + |id| self.cancel.cancel_id.store(id.raw(), Relaxed), + ) + .await + }; + match res { + Ok(0) => break Err(ReadWriteJobError::OutOfBounds), + Ok(n) => { + self.len -= n; + self.offset += n; + unsafe { + self.ptr = self.ptr.add(n); + } + } + Err(e) => break Err(ReadWriteJobError::IoUring(e)), + } + }; + self.result = Some(res); + completion.complete(self) + }) + } + + fn into_any(self: Box) -> Box { + self + } +} diff --git a/src/cpu_worker/tests.rs b/src/cpu_worker/tests.rs new file mode 100644 index 00000000..6b0bb826 --- /dev/null +++ b/src/cpu_worker/tests.rs @@ -0,0 +1,117 @@ +use { + crate::{ + async_engine::{AsyncEngine, SpawnedFuture}, + cpu_worker::{AsyncCpuWork, CompletedWork, CpuJob, CpuWork, CpuWorker, WorkCompletion}, + io_uring::IoUring, + utils::asyncevent::AsyncEvent, + wheel::Wheel, + }, + std::{any::Any, future::pending, rc::Rc, sync::Arc}, + uapi::{c::EFD_CLOEXEC, OwnedFd}, +}; + +struct Job { + ae: Rc, + work: Work, + cancel: bool, +} +struct Work(Arc); +struct AsyncWork(Arc); + +impl CpuJob for Job { + fn work(&mut self) -> &mut dyn CpuWork { + &mut self.work + } + + fn completed(self: Box) { + if self.cancel { + unreachable!(); + } else { + self.ae.trigger(); + } + } +} + +impl Drop for Job { + fn drop(&mut self) { + if self.cancel { + self.ae.trigger(); + } + } +} + +impl CpuWork for Work { + fn run(&mut self) -> Option> { + Some(Box::new(AsyncWork(self.0.clone()))) + } + + fn cancel_async(&mut self, _ring: &Rc) { + uapi::eventfd_write(self.0.raw(), 1).unwrap(); + } + + fn async_work_done(&mut self, work: Box) { + let _ = work; + } +} + +impl AsyncCpuWork for AsyncWork { + fn run( + self: Box, + eng: &Rc, + ring: &Rc, + completion: WorkCompletion, + ) -> SpawnedFuture { + let ring = ring.clone(); + eng.spawn(async move { + let mut buf = [0; 8]; + let res = ring + .read_no_cancel(self.0.borrow(), 0, &mut buf, |_| ()) + .await; + res.unwrap(); + completion.complete(self) + }) + } + + fn into_any(self: Box) -> Box { + self + } +} + +fn run(cancel: bool) { + let eng = AsyncEngine::new(); + let ring = IoUring::new(&eng, 32).unwrap(); + let ring2 = ring.clone(); + let wheel = Wheel::new(&eng, &ring).unwrap(); + let cpu = Rc::new(CpuWorker::new(&ring, &eng).unwrap()); + let ae = Rc::new(AsyncEvent::default()); + let eventfd = Arc::new(uapi::eventfd(0, EFD_CLOEXEC).unwrap()); + let pending_job = cpu.submit(Box::new(Job { + ae: ae.clone(), + work: Work(eventfd.clone()), + cancel, + })); + let _fut1 = eng.spawn(async move { + wheel.timeout(1).await.unwrap(); + if cancel { + drop(pending_job); + } else { + uapi::eventfd_write(eventfd.raw(), 1).unwrap(); + pending::<()>().await; + } + }); + let _fut2 = eng.spawn(async move { + ae.triggered().await; + ring2.stop(); + }); + ring.run().unwrap(); +} + +#[test] +fn cancel() { + run(true); +} + +#[test] +fn complete() { + run(false); +} diff --git a/src/cursor.rs b/src/cursor.rs index 6d8e1d58..8f8d1775 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -328,7 +328,8 @@ impl CursorImageScaled { extents: Rect::new_sized(-xhot, -yhot, width, height).unwrap(), tex: ctx .clone() - .shmem_texture(None, data, ARGB8888, width, height, width * 4, None)?, + .shmem_texture(None, data, ARGB8888, width, height, width * 4, None)? + .into_texture(), })) } } diff --git a/src/gfx_api.rs b/src/gfx_api.rs index d8100264..9e7882ec 100644 --- a/src/gfx_api.rs +++ b/src/gfx_api.rs @@ -1,11 +1,13 @@ use { crate::{ allocator::Allocator, + clientmem::ClientMemOffset, + cpu_worker::CpuWorker, cursor::Cursor, damage::DamageVisualizer, fixed::Fixed, format::Format, - rect::Rect, + rect::{Rect, Region}, renderer::{renderer_base::RendererBase, RenderResult, Renderer}, scale::Scale, state::State, @@ -531,6 +533,44 @@ pub trait GfxTexture: Debug { fn format(&self) -> &'static Format; } +pub trait ShmGfxTexture: GfxTexture { + fn into_texture(self: Rc) -> Rc; +} + +pub trait AsyncShmGfxTextureCallback { + fn completed(self: Rc, res: Result<(), GfxError>); +} + +pub trait AsyncShmGfxTextureUploadCancellable { + fn cancel(&self, id: u64); +} + +pub struct PendingShmUpload { + cancel: Rc, + id: u64, +} + +pub trait AsyncShmGfxTexture: GfxTexture { + fn async_upload( + self: Rc, + callback: Rc, + mem: &Rc, + damage: Region, + ) -> Result, GfxError>; + + fn sync_upload(self: Rc, shm: &[Cell], damage: Region) -> Result<(), GfxError>; + + fn compatible_with( + &self, + format: &'static Format, + width: i32, + height: i32, + stride: i32, + ) -> bool; + + fn into_texture(self: Rc) -> Rc; +} + pub trait GfxContext: Debug { fn reset_status(&self) -> Option; @@ -546,14 +586,23 @@ pub trait GfxContext: Debug { fn shmem_texture( self: Rc, - old: Option>, + old: Option>, data: &[Cell], format: &'static Format, width: i32, height: i32, stride: i32, damage: Option<&[Rect]>, - ) -> Result, GfxError>; + ) -> Result, GfxError>; + + fn async_shmem_texture( + self: Rc, + format: &'static Format, + width: i32, + height: i32, + stride: i32, + cpu_worker: &Rc, + ) -> Result, GfxError>; fn allocator(&self) -> Rc; @@ -588,7 +637,7 @@ pub struct GfxFormat { #[derive(Error)] #[error(transparent)] -pub struct GfxError(pub Box); +pub struct GfxError(pub Box); impl Debug for GfxError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -633,3 +682,15 @@ pub fn cross_intersect_formats( } res } + +impl PendingShmUpload { + pub fn new(cancel: Rc, id: u64) -> Self { + Self { cancel, id } + } +} + +impl Drop for PendingShmUpload { + fn drop(&mut self) { + self.cancel.cancel(self.id); + } +} diff --git a/src/gfx_apis/gl.rs b/src/gfx_apis/gl.rs index d8b96c05..22d2a511 100644 --- a/src/gfx_apis/gl.rs +++ b/src/gfx_apis/gl.rs @@ -67,6 +67,7 @@ macro_rules! dynload { use { crate::{ + clientmem::ClientMemError, gfx_api::{ AcquireSync, CopyTexture, FillRect, GfxApiOpt, GfxContext, GfxError, GfxTexture, ReleaseSync, SyncFile, @@ -196,6 +197,8 @@ enum RenderError { WaitSync, #[error("Buffer format {0} is not supported for shm buffers in OpenGL context")] UnsupportedShmFormat(&'static str), + #[error("Could not access the client memory")] + AccessFailed(#[source] ClientMemError), } #[derive(Default)] @@ -318,6 +321,10 @@ fn fill_boxes3(ctx: &GlRenderContext, boxes: &[[f32; 2]], color: &Color) { fn render_texture(ctx: &GlRenderContext, tex: &CopyTexture) { let texture = tex.tex.as_gl(); + if !texture.gl.contents_valid.get() { + log::error!("Ignoring texture with invalid contents"); + return; + } assert!(rc_eq(&ctx.ctx, &texture.ctx.ctx)); let gles = ctx.ctx.dpy.gles; unsafe { diff --git a/src/gfx_apis/gl/gl/texture.rs b/src/gfx_apis/gl/gl/texture.rs index c1c5e943..d6ab3e02 100644 --- a/src/gfx_apis/gl/gl/texture.rs +++ b/src/gfx_apis/gl/gl/texture.rs @@ -21,8 +21,10 @@ pub struct GlTexture { pub tex: GLuint, pub width: i32, pub height: i32, + pub stride: i32, pub external_only: bool, pub format: &'static Format, + pub contents_valid: Cell, } pub fn image_target(external_only: bool) -> GLenum { @@ -60,8 +62,10 @@ impl GlTexture { tex, width: img.dmabuf.width, height: img.dmabuf.height, + stride: 0, external_only: img.external_only, format: img.dmabuf.format, + contents_valid: Cell::new(true), }) } @@ -108,8 +112,10 @@ impl GlTexture { tex, width, height, + stride, external_only: false, format, + contents_valid: Cell::new(true), }) } } diff --git a/src/gfx_apis/gl/renderer/context.rs b/src/gfx_apis/gl/renderer/context.rs index 4d9568af..f242800c 100644 --- a/src/gfx_apis/gl/renderer/context.rs +++ b/src/gfx_apis/gl/renderer/context.rs @@ -1,10 +1,11 @@ use { crate::{ allocator::Allocator, + cpu_worker::CpuWorker, format::{Format, XRGB8888}, gfx_api::{ - BufferResvUser, GfxApiOpt, GfxContext, GfxError, GfxFormat, GfxFramebuffer, GfxImage, - GfxTexture, ResetStatus, + AsyncShmGfxTexture, BufferResvUser, GfxApiOpt, GfxContext, GfxError, GfxFormat, + GfxFramebuffer, GfxImage, ResetStatus, ShmGfxTexture, }, gfx_apis::gl::{ egl::{context::EglContext, display::EglDisplay, image::EglImage}, @@ -265,20 +266,50 @@ impl GfxContext for GlRenderContext { fn shmem_texture( self: Rc, - _old: Option>, + _old: Option>, data: &[Cell], format: &'static Format, width: i32, height: i32, stride: i32, _damage: Option<&[Rect]>, - ) -> Result, GfxError> { + ) -> Result, GfxError> { (&self) .shmem_texture(data, format, width, height, stride) - .map(|w| w as Rc) + .map(|w| w as Rc) .map_err(|e| e.into()) } + fn async_shmem_texture( + self: Rc, + format: &'static Format, + width: i32, + height: i32, + stride: i32, + _cpu_worker: &Rc, + ) -> Result, GfxError> { + let tex = self.ctx.with_current(|| unsafe { + let mut tex = 0; + (self.ctx.dpy.gles.glGenTextures)(1, &mut tex); + Ok(tex) + })?; + Ok(Rc::new(Texture { + gl: GlTexture { + ctx: self.ctx.clone(), + img: None, + tex, + width, + height, + stride, + external_only: false, + format, + contents_valid: Cell::new(false), + }, + ctx: self, + format, + })) + } + fn allocator(&self) -> Rc { self.gbm.clone() } diff --git a/src/gfx_apis/gl/renderer/texture.rs b/src/gfx_apis/gl/renderer/texture.rs index 003fe6be..efa45b84 100644 --- a/src/gfx_apis/gl/renderer/texture.rs +++ b/src/gfx_apis/gl/renderer/texture.rs @@ -1,12 +1,21 @@ use { crate::{ + clientmem::ClientMemOffset, format::Format, - gfx_api::{GfxError, GfxTexture}, + gfx_api::{ + AsyncShmGfxTexture, AsyncShmGfxTextureCallback, GfxError, GfxTexture, PendingShmUpload, + ShmGfxTexture, + }, gfx_apis::gl::{ gl::texture::GlTexture, renderer::{context::GlRenderContext, framebuffer::Framebuffer}, + sys::{ + GLint, GL_CLAMP_TO_EDGE, GL_TEXTURE_2D, GL_TEXTURE_WRAP_S, GL_TEXTURE_WRAP_T, + GL_UNPACK_ROW_LENGTH_EXT, + }, RenderError, }, + rect::Region, video::dmabuf::DmaBuf, }, std::{ @@ -82,3 +91,72 @@ impl GfxTexture for Texture { self.format } } + +impl ShmGfxTexture for Texture { + fn into_texture(self: Rc) -> Rc { + self + } +} + +impl AsyncShmGfxTexture for Texture { + fn async_upload( + self: Rc, + _callback: Rc, + mem: &Rc, + damage: Region, + ) -> Result, GfxError> { + mem.access(|data| self.clone().sync_upload(data, damage)) + .map_err(RenderError::AccessFailed)??; + Ok(None) + } + + fn sync_upload(self: Rc, data: &[Cell], _damage: Region) -> Result<(), GfxError> { + let shm_info = self.format.shm_info.as_ref().unwrap(); + if (self.gl.stride * self.gl.height) as usize > data.len() { + return Err(RenderError::SmallImageBuffer.into()); + } + let gles = self.ctx.ctx.dpy.gles; + self.ctx.ctx.with_current(|| unsafe { + (gles.glBindTexture)(GL_TEXTURE_2D, self.gl.tex); + (gles.glTexParameteri)(GL_TEXTURE_2D, GL_TEXTURE_WRAP_S, GL_CLAMP_TO_EDGE); + (gles.glTexParameteri)(GL_TEXTURE_2D, GL_TEXTURE_WRAP_T, GL_CLAMP_TO_EDGE); + (gles.glPixelStorei)( + GL_UNPACK_ROW_LENGTH_EXT, + self.gl.stride / shm_info.bpp as GLint, + ); + (gles.glTexImage2D)( + GL_TEXTURE_2D, + 0, + shm_info.gl_format, + self.gl.width, + self.gl.height, + 0, + shm_info.gl_format as _, + shm_info.gl_type as _, + data.as_ptr() as _, + ); + (gles.glPixelStorei)(GL_UNPACK_ROW_LENGTH_EXT, 0); + (gles.glBindTexture)(GL_TEXTURE_2D, 0); + Ok(()) + })?; + self.gl.contents_valid.set(true); + Ok(()) + } + + fn compatible_with( + &self, + format: &'static Format, + width: i32, + height: i32, + stride: i32, + ) -> bool { + format == self.gl.format + && width == self.gl.width + && height == self.gl.height + && stride == self.gl.stride + } + + fn into_texture(self: Rc) -> Rc { + self + } +} diff --git a/src/gfx_apis/vulkan.rs b/src/gfx_apis/vulkan.rs index 8b1cae35..4562377b 100644 --- a/src/gfx_apis/vulkan.rs +++ b/src/gfx_apis/vulkan.rs @@ -14,15 +14,16 @@ mod semaphore; mod shaders; mod shm_image; mod staging; -mod util; use { crate::{ allocator::{Allocator, AllocatorError}, async_engine::AsyncEngine, + cpu_worker::{jobs::read_write::ReadWriteJobError, CpuWorker}, format::Format, gfx_api::{ - GfxContext, GfxError, GfxFormat, GfxFramebuffer, GfxImage, GfxTexture, ResetStatus, + AsyncShmGfxTexture, GfxContext, GfxError, GfxFormat, GfxFramebuffer, GfxImage, + ResetStatus, ShmGfxTexture, }, gfx_apis::vulkan::{ image::VulkanImageMemory, instance::VulkanInstance, renderer::VulkanRenderer, @@ -196,6 +197,12 @@ pub enum VulkanError { WaitIdle(#[source] vk::Result), #[error("Could not dup a DRM device")] DupDrm(#[source] DrmError), + #[error("Graphics context has already been dropped")] + Defunct, + #[error("Could not perform an async copy to the staging buffer")] + AsyncCopyToStaging(#[source] ReadWriteJobError), + #[error("The async shm texture is busy")] + AsyncCopyBusy, } impl From for GfxError { @@ -250,16 +257,16 @@ impl GfxContext for Context { fn shmem_texture( self: Rc, - old: Option>, + old: Option>, data: &[Cell], format: &'static Format, width: i32, height: i32, stride: i32, damage: Option<&[Rect]>, - ) -> Result, GfxError> { + ) -> Result, GfxError> { if let Some(old) = old { - let old = old.into_vk(&self.0.device.device); + let old = old.into_texture().into_vk(&self.0.device.device); let shm = match &old.ty { VulkanImageMemory::DmaBuf(_) => unreachable!(), VulkanImageMemory::Internal(shm) => shm, @@ -275,10 +282,30 @@ impl GfxContext for Context { } let tex = self .0 - .create_shm_texture(format, width, height, stride, data, false)?; + .create_shm_texture(format, width, height, stride, data, false, None)?; Ok(tex as _) } + fn async_shmem_texture( + self: Rc, + format: &'static Format, + width: i32, + height: i32, + stride: i32, + cpu_worker: &Rc, + ) -> Result, GfxError> { + let tex = self.0.create_shm_texture( + format, + width, + height, + stride, + &[], + false, + Some(cpu_worker), + )?; + Ok(tex) + } + fn allocator(&self) -> Rc { self.0.device.gbm.clone() } @@ -296,7 +323,7 @@ impl GfxContext for Context { ) -> Result, GfxError> { let fb = self .0 - .create_shm_texture(format, width, height, stride, &[], true)?; + .create_shm_texture(format, width, height, stride, &[], true, None)?; Ok(fb) } diff --git a/src/gfx_apis/vulkan/allocator.rs b/src/gfx_apis/vulkan/allocator.rs index 83f661ba..6cff0834 100644 --- a/src/gfx_apis/vulkan/allocator.rs +++ b/src/gfx_apis/vulkan/allocator.rs @@ -1,53 +1,122 @@ use { crate::{ - gfx_apis::vulkan::{device::VulkanDevice, instance::API_VERSION, VulkanError}, + cpu_worker::{AsyncCpuWork, CpuJob, CpuWork, CpuWorker}, + gfx_apis::vulkan::{ + device::VulkanDevice, instance::API_VERSION, renderer::VulkanRenderer, VulkanError, + }, utils::{numcell::NumCell, ptr_ext::MutPtrExt}, }, - ash::vk::{DeviceMemory, DeviceSize, MemoryRequirements}, - gpu_alloc::{Config, GpuAllocator, MemoryBlock, Request, UsageFlags}, + ash::{ + vk::{DeviceMemory, DeviceSize, MemoryRequirements}, + Device, + }, + gpu_alloc::{Config, GpuAllocator, MemoryBlock, MemoryPropertyFlags, Request, UsageFlags}, gpu_alloc_ash::AshMemoryDevice, + parking_lot::Mutex, std::{ cell::{Cell, UnsafeCell}, rc::Rc, + sync::Arc, }, }; -pub struct VulkanAllocator { - pub(super) device: Rc, - pub(super) non_coherent_atom_mask: u64, +pub struct SyncAllocatorStorage { + allocator: Arc>>, + device: Rc, +} + +pub struct UnsyncAllocatorStorage { allocator: UnsafeCell>, + device: Rc, +} + +pub struct VulkanAllocatorType { + storage: T, + non_coherent_atom_mask: u64, total: NumCell, } +pub type VulkanAllocator = VulkanAllocatorType; +pub type VulkanThreadedAllocator = VulkanAllocatorType; + +enum AllocatorType { + Local(Rc), + Threaded { + allocator: Rc, + renderer: Rc, + cpu: Rc, + }, +} + pub struct VulkanAllocation { - pub(super) allocator: Rc, + allocator: AllocatorType, pub(super) memory: DeviceMemory, pub(super) offset: DeviceSize, pub(super) mem: Option<*mut u8>, pub(super) size: DeviceSize, + pub(super) coherency_mask: Option, block: Cell>>, } +impl VulkanAllocation { + unsafe fn free_locally( + &self, + allocator: &VulkanAllocatorType, + device: &VulkanDevice, + gpu: &mut GpuAllocator, + ) { + allocator.total.fetch_sub(self.size); + let block = self.block.take().unwrap(); + do_free(gpu, &device.device, block, self.mem); + } +} + impl Drop for VulkanAllocation { fn drop(&mut self) { unsafe { - self.allocator.total.fetch_sub(self.size); - let mut block = self.block.take().unwrap(); - if let Some(_ptr) = self.mem { - // log::info!("free = {:?} - {:?} ({})", ptr, ptr.add(block.size() as usize), block.size()); - block.unmap(AshMemoryDevice::wrap(&self.allocator.device.device)); + match &self.allocator { + AllocatorType::Local(a) => self.free_locally(a, &a.storage.device, a.allocator()), + AllocatorType::Threaded { + allocator, + renderer, + cpu, + } => { + if renderer.defunct.get() { + self.free_locally( + allocator, + &allocator.storage.device, + &mut allocator.storage.allocator.lock(), + ); + } else { + let id = renderer.allocate_point(); + let job = FreeJob { + id, + renderer: renderer.clone(), + allocator: allocator.clone(), + size: self.size, + work: FreeWork { + device: allocator.storage.device.device.clone(), + allocator: allocator.storage.allocator.clone(), + allocation: Some(UnsafeAllocation { + block: self.block.take().unwrap(), + ptr: self.mem, + }), + }, + }; + let pending = cpu.submit(Box::new(job)); + renderer.pending_cpu_jobs.set(id, pending); + } + } } - self.allocator - .allocator - .get() - .deref_mut() - .dealloc(AshMemoryDevice::wrap(&self.allocator.device.device), block); } } } impl VulkanDevice { - pub fn create_allocator(self: &Rc) -> Result, VulkanError> { + fn create_allocator_( + self: &Rc, + map: impl FnOnce(GpuAllocator) -> T, + ) -> Result>, VulkanError> { let config = Config::i_am_prototyping(); let props = unsafe { gpu_alloc_ash::device_properties( @@ -60,18 +129,56 @@ impl VulkanDevice { props.buffer_device_address = false; let non_coherent_atom_size = props.non_coherent_atom_size; let allocator = GpuAllocator::new(config, props); - Ok(Rc::new(VulkanAllocator { - device: self.clone(), + Ok(Rc::new(VulkanAllocatorType { non_coherent_atom_mask: non_coherent_atom_size - 1, - allocator: UnsafeCell::new(allocator), + storage: map(allocator), total: Default::default(), })) } + + pub fn create_allocator(self: &Rc) -> Result, VulkanError> { + self.create_allocator_(|a| UnsyncAllocatorStorage { + allocator: UnsafeCell::new(a), + device: self.clone(), + }) + } + + pub fn create_threaded_allocator( + self: &Rc, + ) -> Result, VulkanError> { + self.create_allocator_(|a| SyncAllocatorStorage { + allocator: Arc::new(Mutex::new(a)), + device: self.clone(), + }) + } +} + +impl VulkanAllocatorType { + fn commit_allocation( + self: &Rc, + ua: UnsafeAllocation, + allocator: AllocatorType, + ) -> VulkanAllocation { + let UnsafeAllocation { block, ptr } = ua; + self.total.fetch_add(block.size()); + VulkanAllocation { + allocator, + memory: *block.memory(), + offset: block.offset(), + mem: ptr, + size: block.size(), + coherency_mask: match block.props().contains(MemoryPropertyFlags::HOST_COHERENT) { + true => None, + false => Some(self.non_coherent_atom_mask), + }, + block: Cell::new(Some(block)), + } + } } impl VulkanAllocator { fn allocator(&self) -> &mut GpuAllocator { - unsafe { self.allocator.get().deref_mut() } + unsafe { self.storage.allocator.get().deref_mut() } } pub fn alloc( @@ -80,49 +187,202 @@ impl VulkanAllocator { usage: UsageFlags, map: bool, ) -> Result { - let request = Request { - size: req.size, - align_mask: req.alignment - 1, + let ua = do_alloc( + self.allocator(), + &self.storage.device.device, + req, usage, - memory_types: req.memory_type_bits, - }; - let block = unsafe { - self.allocator() - .alloc(AshMemoryDevice::wrap(&self.device.device), request) - }; - let mut block = block.map_err(VulkanError::AllocateMemory2)?; - let ptr = match map { - true => { - let ptr = unsafe { - block.map( - AshMemoryDevice::wrap(&self.device.device), - 0, - block.size() as usize, - ) - }; - Some(ptr.map_err(VulkanError::MapMemory)?.as_ptr()) - } - false => None, - }; - self.total.fetch_add(block.size()); - Ok(VulkanAllocation { + map, + )?; + Ok(self.commit_allocation(ua, AllocatorType::Local(self.clone()))) + } +} + +impl VulkanThreadedAllocator { + pub fn async_alloc( + self: &Rc, + renderer: &Rc, + cpu: &Rc, + req: MemoryRequirements, + usage: UsageFlags, + map: bool, + cb: impl FnOnce(Result) + 'static, + ) -> Result<(), VulkanError> { + renderer.check_defunct()?; + let id = renderer.allocate_point(); + let job = AllocJob { + id, + renderer: renderer.clone(), + cpu: cpu.clone(), allocator: self.clone(), - memory: *block.memory(), - offset: block.offset(), - mem: ptr, - size: block.size(), - block: Cell::new(Some(block)), - }) + cb: Some(cb), + work: AllocWork { + req, + usage, + map, + device: self.storage.device.device.clone(), + allocator: self.storage.allocator.clone(), + res: None, + }, + }; + let pending = cpu.submit(Box::new(job)); + renderer.pending_cpu_jobs.set(id, pending); + Ok(()) + } +} + +struct AllocJob { + id: u64, + renderer: Rc, + cpu: Rc, + allocator: Rc, + cb: Option, + work: AllocWork, +} + +struct AllocWork { + req: MemoryRequirements, + usage: UsageFlags, + map: bool, + device: Arc, + allocator: Arc>>, + res: Option>, +} + +impl CpuWork for AllocWork { + fn run(&mut self) -> Option> { + let r = do_alloc( + &mut self.allocator.lock(), + &self.device, + &self.req, + self.usage, + self.map, + ); + self.res = Some(r); + None + } +} + +impl CpuJob for AllocJob +where + T: FnOnce(Result), +{ + fn work(&mut self) -> &mut dyn CpuWork { + &mut self.work + } + + fn completed(mut self: Box) { + self.renderer.pending_cpu_jobs.remove(&self.id); + let res = self.work.res.take().unwrap().map(|ua| { + self.allocator.commit_allocation( + ua, + AllocatorType::Threaded { + allocator: self.allocator.clone(), + renderer: self.renderer.clone(), + cpu: self.cpu.clone(), + }, + ) + }); + self.cb.take().unwrap()(res); + } +} + +struct FreeJob { + id: u64, + renderer: Rc, + allocator: Rc, + size: u64, + work: FreeWork, +} + +struct FreeWork { + device: Arc, + allocator: Arc>>, + allocation: Option, +} + +impl CpuWork for FreeWork { + fn run(&mut self) -> Option> { + let ua = self.allocation.take().unwrap(); + unsafe { + do_free(&mut self.allocator.lock(), &self.device, ua.block, ua.ptr); + } + None + } +} + +impl CpuJob for FreeJob { + fn work(&mut self) -> &mut dyn CpuWork { + &mut self.work + } + + fn completed(self: Box) { + self.renderer.pending_cpu_jobs.remove(&self.id); + self.allocator.total.fetch_sub(self.size); + } +} + +pub struct UnsafeAllocation { + block: MemoryBlock, + ptr: Option<*mut u8>, +} + +unsafe impl Send for UnsafeAllocation {} + +fn do_alloc( + allocator: &mut GpuAllocator, + device: &Device, + req: &MemoryRequirements, + usage: UsageFlags, + map: bool, +) -> Result { + let request = Request { + size: req.size, + align_mask: req.alignment - 1, + usage, + memory_types: req.memory_type_bits, + }; + let device = AshMemoryDevice::wrap(device); + let block = unsafe { allocator.alloc(device, request) }; + let mut block = block.map_err(VulkanError::AllocateMemory2)?; + let ptr = match map { + true => { + let ptr = unsafe { block.map(device, 0, block.size() as usize) }; + Some(ptr.map_err(VulkanError::MapMemory)?.as_ptr()) + } + false => None, + }; + Ok(UnsafeAllocation { block, ptr }) +} + +unsafe fn do_free( + gpu: &mut GpuAllocator, + device: &Device, + mut block: MemoryBlock, + ptr: Option<*mut u8>, +) { + let device = AshMemoryDevice::wrap(device); + if let Some(_ptr) = ptr { + // log::info!("free = {:?} - {:?} ({})", ptr, ptr.add(block.size() as usize), block.size()); + block.unmap(device); + } + gpu.dealloc(device, block); +} + +impl Drop for UnsyncAllocatorStorage { + fn drop(&mut self) { + let device = AshMemoryDevice::wrap(&self.device.device); + unsafe { + self.allocator.get_mut().cleanup(device); + } } } -impl Drop for VulkanAllocator { +impl Drop for SyncAllocatorStorage { fn drop(&mut self) { + let device = AshMemoryDevice::wrap(&self.device.device); unsafe { - self.allocator - .get() - .deref_mut() - .cleanup(AshMemoryDevice::wrap(&self.device.device)); + self.allocator.lock().cleanup(device); } } } diff --git a/src/gfx_apis/vulkan/bo_allocator.rs b/src/gfx_apis/vulkan/bo_allocator.rs index 1874c72f..f89da55b 100644 --- a/src/gfx_apis/vulkan/bo_allocator.rs +++ b/src/gfx_apis/vulkan/bo_allocator.rs @@ -8,9 +8,9 @@ use { gfx_apis::vulkan::{ allocator::VulkanAllocator, command::VulkanCommandBuffer, device::VulkanDevice, format::VulkanFormat, renderer::image_barrier, staging::VulkanStagingBuffer, - util::OnDrop, VulkanError, + VulkanError, }, - utils::errorfmt::ErrorFmt, + utils::{errorfmt::ErrorFmt, on_drop::OnDrop}, video::{ dmabuf::{DmaBuf, DmaBufIds, DmaBufPlane, PlaneVec}, drm::Drm, diff --git a/src/gfx_apis/vulkan/device.rs b/src/gfx_apis/vulkan/device.rs index 8e23fabe..8792a8ac 100644 --- a/src/gfx_apis/vulkan/device.rs +++ b/src/gfx_apis/vulkan/device.rs @@ -7,9 +7,9 @@ use { map_extension_properties, ApiVersionDisplay, Extensions, VulkanInstance, API_VERSION, }, - util::OnDrop, VulkanError, }, + utils::on_drop::OnDrop, video::{ drm::{sync_obj::SyncObjCtx, Drm}, gbm::GbmDevice, @@ -42,6 +42,7 @@ use { std::{ ffi::{CStr, CString}, rc::Rc, + sync::Arc, }, uapi::Ustr, }; @@ -52,7 +53,7 @@ pub struct VulkanDevice { pub(super) gbm: Rc, pub(super) sync_ctx: Rc, pub(super) instance: Rc, - pub(super) device: Device, + pub(super) device: Arc, pub(super) external_memory_fd: external_memory_fd::Device, pub(super) external_semaphore_fd: external_semaphore_fd::Device, pub(super) external_fence_fd: external_fence_fd::Device, @@ -292,7 +293,7 @@ impl VulkanInstance { sync_ctx: Rc::new(SyncObjCtx::new(gbm.drm.fd())), gbm: Rc::new(gbm), instance: self.clone(), - device, + device: Arc::new(device), external_memory_fd, external_semaphore_fd, external_fence_fd, diff --git a/src/gfx_apis/vulkan/image.rs b/src/gfx_apis/vulkan/image.rs index d92ad53a..53c26af4 100644 --- a/src/gfx_apis/vulkan/image.rs +++ b/src/gfx_apis/vulkan/image.rs @@ -1,13 +1,19 @@ use { crate::{ + clientmem::ClientMemOffset, format::Format, - gfx_api::{GfxApiOpt, GfxError, GfxFramebuffer, GfxImage, GfxTexture, SyncFile}, + gfx_api::{ + AsyncShmGfxTexture, AsyncShmGfxTextureCallback, AsyncShmGfxTextureUploadCancellable, + GfxApiOpt, GfxError, GfxFramebuffer, GfxImage, GfxTexture, PendingShmUpload, + ShmGfxTexture, SyncFile, + }, gfx_apis::vulkan::{ allocator::VulkanAllocation, device::VulkanDevice, format::VulkanModifierLimits, - renderer::VulkanRenderer, shm_image::VulkanShmImage, util::OnDrop, VulkanError, + renderer::VulkanRenderer, shm_image::VulkanShmImage, VulkanError, }, + rect::Region, theme::Color, - utils::clonecell::CloneCell, + utils::{clonecell::CloneCell, on_drop::OnDrop}, video::dmabuf::{DmaBuf, PlaneVec}, }, ash::vk::{ @@ -51,6 +57,7 @@ pub struct VulkanImage { pub(super) render_view: Option, pub(super) image: Image, pub(super) is_undefined: Cell, + pub(super) contents_are_undefined: Cell, pub(super) ty: VulkanImageMemory, pub(super) render_ops: CloneCell>, pub(super) bridge: Option, @@ -378,6 +385,7 @@ impl VulkanDmaBufImageTemplate { }), format: self.dmabuf.format, is_undefined: Cell::new(true), + contents_are_undefined: Cell::new(false), bridge, })) } @@ -530,3 +538,64 @@ impl GfxTexture for VulkanImage { self.format } } + +impl ShmGfxTexture for VulkanImage { + fn into_texture(self: Rc) -> Rc { + self + } +} + +impl AsyncShmGfxTexture for VulkanImage { + fn async_upload( + self: Rc, + callback: Rc, + mem: &Rc, + damage: Region, + ) -> Result, GfxError> { + let VulkanImageMemory::Internal(shm) = &self.ty else { + unreachable!(); + }; + let pending = shm.async_upload(&self, mem, damage, callback)?; + Ok(pending) + } + + fn sync_upload(self: Rc, mem: &[Cell], damage: Region) -> Result<(), GfxError> { + let VulkanImageMemory::Internal(shm) = &self.ty else { + unreachable!(); + }; + if shm.async_data.as_ref().unwrap().busy.get() { + return Err(VulkanError::AsyncCopyBusy.into()); + } + shm.upload(&self, mem, Some(damage.rects()))?; + Ok(()) + } + + fn compatible_with( + &self, + format: &'static Format, + width: i32, + height: i32, + stride: i32, + ) -> bool { + self.format == format + && self.width == width as u32 + && self.height == height as u32 + && self.stride == stride as u32 + } + + fn into_texture(self: Rc) -> Rc { + self + } +} + +impl AsyncShmGfxTextureUploadCancellable for VulkanImage { + fn cancel(&self, id: u64) { + let VulkanImageMemory::Internal(shm) = &self.ty else { + unreachable!(); + }; + let data = shm.async_data.as_ref().unwrap(); + if data.callback_id.get() == id { + data.callback.take(); + } + } +} diff --git a/src/gfx_apis/vulkan/instance.rs b/src/gfx_apis/vulkan/instance.rs index aba36851..bb39f3c3 100644 --- a/src/gfx_apis/vulkan/instance.rs +++ b/src/gfx_apis/vulkan/instance.rs @@ -1,5 +1,8 @@ use { - crate::gfx_apis::vulkan::{util::OnDrop, VulkanError, VULKAN_VALIDATION}, + crate::{ + gfx_apis::vulkan::{VulkanError, VULKAN_VALIDATION}, + utils::on_drop::OnDrop, + }, ahash::{AHashMap, AHashSet}, ash::{ ext::{debug_utils, validation_features}, diff --git a/src/gfx_apis/vulkan/pipeline.rs b/src/gfx_apis/vulkan/pipeline.rs index 037e1692..3d495f39 100644 --- a/src/gfx_apis/vulkan/pipeline.rs +++ b/src/gfx_apis/vulkan/pipeline.rs @@ -1,7 +1,10 @@ use { - crate::gfx_apis::vulkan::{ - descriptor::VulkanDescriptorSetLayout, device::VulkanDevice, shaders::VulkanShader, - util::OnDrop, VulkanError, + crate::{ + gfx_apis::vulkan::{ + descriptor::VulkanDescriptorSetLayout, device::VulkanDevice, shaders::VulkanShader, + VulkanError, + }, + utils::on_drop::OnDrop, }, arrayvec::ArrayVec, ash::{ diff --git a/src/gfx_apis/vulkan/renderer.rs b/src/gfx_apis/vulkan/renderer.rs index d884d76f..0018700b 100644 --- a/src/gfx_apis/vulkan/renderer.rs +++ b/src/gfx_apis/vulkan/renderer.rs @@ -1,13 +1,14 @@ use { crate::{ async_engine::{AsyncEngine, SpawnedFuture}, + cpu_worker::PendingJob, format::{Format, XRGB8888}, gfx_api::{ AcquireSync, BufferResv, BufferResvUser, GfxApiOpt, GfxFormat, GfxFramebuffer, GfxTexture, GfxWriteModifier, ReleaseSync, SyncFile, }, gfx_apis::vulkan::{ - allocator::VulkanAllocator, + allocator::{VulkanAllocator, VulkanThreadedAllocator}, command::{VulkanCommandBuffer, VulkanCommandPool}, descriptor::VulkanDescriptorSetLayout, device::VulkanDevice, @@ -78,6 +79,9 @@ pub struct VulkanRenderer { pub(super) tex_frag_mult_opaque_shader: Rc, pub(super) tex_frag_mult_alpha_shader: Rc, pub(super) tex_descriptor_set_layout: Rc, + pub(super) defunct: Cell, + pub(super) pending_cpu_jobs: CopyHashMap, + pub(super) shm_allocator: Rc, } pub(super) struct UsedTexture { @@ -172,6 +176,7 @@ impl VulkanDevice { }) .collect(); let allocator = self.create_allocator()?; + let shm_allocator = self.create_threaded_allocator()?; let render = Rc::new(VulkanRenderer { formats: Rc::new(formats), device: self.clone(), @@ -195,6 +200,9 @@ impl VulkanDevice { tex_frag_mult_opaque_shader, tex_frag_mult_alpha_shader, tex_descriptor_set_layout, + defunct: Cell::new(false), + pending_cpu_jobs: Default::default(), + shm_allocator, }); render.get_or_create_pipelines(XRGB8888.vk_format)?; Ok(render) @@ -271,6 +279,9 @@ impl VulkanRenderer { for cmd in opts { if let GfxApiOpt::CopyTexture(c) = cmd { let tex = c.tex.clone().into_vk(&self.device.device); + if tex.contents_are_undefined.get() { + continue; + } if let VulkanImageMemory::DmaBuf(_) = &tex.ty { memory.sample.push(tex.clone()) } @@ -450,6 +461,10 @@ impl VulkanRenderer { } GfxApiOpt::CopyTexture(c) => { let tex = c.tex.as_vk(&self.device.device); + if tex.contents_are_undefined.get() { + log::warn!("Ignoring undefined texture"); + continue; + } let copy_type = match c.alpha.is_some() { true => TexCopyType::Multiply, false => TexCopyType::Identity, @@ -799,6 +814,7 @@ impl VulkanRenderer { stride as i32, &[], true, + None, )?; (&*tmp_tex as &dyn GfxFramebuffer) .copy_texture( @@ -996,6 +1012,7 @@ impl VulkanRenderer { opts: &[GfxApiOpt], clear: Option<&Color>, ) -> Result<(), VulkanError> { + self.check_defunct()?; let buf = self.allocate_command_buffer()?; self.collect_memory(opts); self.begin_command_buffer(buf.buffer)?; @@ -1025,6 +1042,7 @@ impl VulkanRenderer { } pub fn on_drop(&self) { + self.defunct.set(true); let mut pending_frames = self.pending_frames.lock(); let mut pending_uploads = self.pending_uploads.lock(); if pending_frames.is_not_empty() || pending_uploads.is_not_empty() { @@ -1037,6 +1055,13 @@ impl VulkanRenderer { pending_frames.clear(); pending_uploads.clear(); } + + pub(super) fn check_defunct(&self) -> Result<(), VulkanError> { + match self.defunct.get() { + true => Err(VulkanError::Defunct), + false => Ok(()), + } + } } impl Debug for VulkanRenderer { diff --git a/src/gfx_apis/vulkan/shm_image.rs b/src/gfx_apis/vulkan/shm_image.rs index e272eb87..24ef74e5 100644 --- a/src/gfx_apis/vulkan/shm_image.rs +++ b/src/gfx_apis/vulkan/shm_image.rs @@ -1,7 +1,15 @@ use { crate::{ + clientmem::ClientMemOffset, + cpu_worker::{ + jobs::{ + img_copy::ImgCopyWork, + read_write::{ReadWriteJobError, ReadWriteWork}, + }, + CpuJob, CpuWork, CpuWorker, + }, format::{Format, FormatShmInfo}, - gfx_api::SyncFile, + gfx_api::{AsyncShmGfxTextureCallback, PendingShmUpload, SyncFile}, gfx_apis::vulkan::{ allocator::VulkanAllocation, command::VulkanCommandBuffer, @@ -9,11 +17,10 @@ use { image::{VulkanImage, VulkanImageMemory}, renderer::{image_barrier, VulkanRenderer}, staging::VulkanStagingBuffer, - util::OnDrop, VulkanError, }, - rect::Rect, - utils::errorfmt::ErrorFmt, + rect::{Rect, Region}, + utils::{clonecell::CloneCell, errorfmt::ErrorFmt, on_drop::OnDrop}, }, ash::vk::{ AccessFlags2, BufferImageCopy2, BufferMemoryBarrier2, CommandBufferBeginInfo, @@ -25,14 +32,32 @@ use { }, gpu_alloc::UsageFlags, isnt::std_1::primitive::IsntSliceExt, - std::{cell::Cell, ptr, rc::Rc, slice}, + std::{ + cell::{Cell, RefCell}, + ptr, + rc::Rc, + slice, + }, + uapi::OwnedFd, }; pub struct VulkanShmImage { - pub(super) _size: DeviceSize, + pub(super) size: DeviceSize, pub(super) stride: u32, pub(super) _allocation: VulkanAllocation, pub(super) shm_info: &'static FormatShmInfo, + pub(super) async_data: Option, +} + +pub struct VulkanShmImageAsyncData { + pub(super) busy: Cell, + pub(super) io_job: Cell>>, + pub(super) copy_job: Cell>>, + pub(super) staging: CloneCell>>, + pub(super) callback: Cell>>, + pub(super) callback_id: Cell, + pub(super) regions: RefCell>>, + pub(super) cpu: Rc, } impl VulkanShmImage { @@ -42,6 +67,7 @@ impl VulkanShmImage { buffer: &[Cell], damage: Option<&[Rect]>, ) -> Result<(), VulkanError> { + img.renderer.check_defunct()?; if let Some(damage) = damage { if damage.is_empty() { return Ok(()); @@ -133,6 +159,30 @@ impl VulkanShmImage { ptr::copy_nonoverlapping(buf, mem, total_size as usize); } })?; + let Some((cmd, fence, sync_file, point)) = + self.submit_buffer_to_image_copy(img, &staging, cpy)? + else { + return Ok(()); + }; + let future = img.renderer.eng.spawn(await_upload( + point, + img.clone(), + cmd, + sync_file, + fence, + staging, + )); + img.renderer.pending_uploads.set(point, future); + Ok(()) + } + + fn submit_buffer_to_image_copy( + &self, + img: &Rc, + staging: &VulkanStagingBuffer, + regions: &[BufferImageCopy2], + ) -> Result, Rc, SyncFile, u64)>, VulkanError> + { let memory_barrier = |sam, ssm, dam, dsm| { BufferMemoryBarrier2::default() .buffer(staging.buffer) @@ -185,7 +235,7 @@ impl VulkanShmImage { .src_buffer(staging.buffer) .dst_image(img.image) .dst_image_layout(ImageLayout::TRANSFER_DST_OPTIMAL) - .regions(cpy); + .regions(regions); let cmd = img.renderer.allocate_command_buffer()?; let dev = &img.renderer.device.device; let command_buffer_info = CommandBufferSubmitInfo::default().command_buffer(cmd.buffer); @@ -210,35 +260,341 @@ impl VulkanShmImage { .map_err(VulkanError::Submit)?; } img.is_undefined.set(false); + img.contents_are_undefined.set(false); let release_sync_file = match release_fence.export_sync_file() { Ok(s) => s, Err(e) => { log::error!("Could not export sync file from fence: {}", ErrorFmt(e)); img.renderer.block(); - return Ok(()); + return Ok(None); } }; let point = img.renderer.allocate_point(); - let future = img.renderer.eng.spawn(await_upload( + Ok(Some((cmd, release_fence, release_sync_file, point))) + } +} + +async fn await_upload( + id: u64, + img: Rc, + buf: Rc, + sync_file: SyncFile, + _fence: Rc, + _staging: VulkanStagingBuffer, +) { + let res = img.renderer.ring.readable(&sync_file.0).await; + if let Err(e) = res { + log::error!( + "Could not wait for sync file to become readable: {}", + ErrorFmt(e) + ); + img.renderer.block(); + } + img.renderer.command_buffers.push(buf); + img.renderer.pending_uploads.remove(&id); +} + +impl VulkanShmImageAsyncData { + fn complete(&self, result: Result<(), VulkanError>) { + self.busy.set(false); + if let Some(cb) = self.callback.take() { + cb.completed(result.map_err(|e| e.into())); + } + } +} + +impl VulkanShmImage { + pub fn async_upload( + &self, + img: &Rc, + client_mem: &Rc, + damage: Region, + callback: Rc, + ) -> Result, VulkanError> { + let data = self.async_data.as_ref().unwrap(); + let res = self.try_async_upload(img, data, client_mem, damage); + match res { + Ok(()) => { + let id = img.renderer.allocate_point(); + data.callback_id.set(id); + data.callback.set(Some(callback)); + Ok(Some(PendingShmUpload::new(img.clone(), id))) + } + Err(e) => Err(e), + } + } + + fn try_async_upload( + &self, + img: &Rc, + data: &VulkanShmImageAsyncData, + client_mem: &Rc, + mut damage: Region, + ) -> Result<(), VulkanError> { + if data.busy.get() { + return Err(VulkanError::AsyncCopyBusy); + } + if self.size > client_mem.ptr().len() as u64 { + return Err(VulkanError::InvalidBufferSize); + } + data.busy.set(true); + if img.contents_are_undefined.get() { + damage = Region::new2(Rect::new_sized(0, 0, img.width as _, img.height as _).unwrap()) + } + + let copies = &mut *data.regions.borrow_mut(); + copies.clear(); + + let mut copy = |x, y, width, height| { + let buffer_offset = (y as u32 * img.stride + x as u32 * self.shm_info.bpp) as u64; + let copy = BufferImageCopy2::default() + .buffer_offset(buffer_offset) + .image_offset(Offset3D { x, y, z: 0 }) + .image_extent(Extent3D { + width, + height, + depth: 1, + }) + .image_subresource(ImageSubresourceLayers { + aspect_mask: ImageAspectFlags::COLOR, + mip_level: 0, + base_array_layer: 0, + layer_count: 1, + }) + .buffer_image_height(img.height) + .buffer_row_length(img.stride / self.shm_info.bpp); + copies.push(copy); + }; + for damage in damage.rects() { + let Some(damage) = Rect::new( + damage.x1().max(0), + damage.y1().max(0), + damage.x2().min(img.width as i32), + damage.y2().min(img.height as i32), + ) else { + continue; + }; + if damage.is_empty() { + continue; + } + copy( + damage.x1(), + damage.y1(), + damage.width() as u32, + damage.height() as u32, + ); + } + + if let Some(staging) = data.staging.get() { + return self.async_upload_initiate_copy(img, data, &staging, copies, client_mem); + } + + let img2 = img.clone(); + let client_mem = client_mem.clone(); + img.renderer.device.create_shm_staging( + &img.renderer, + &data.cpu, + self.size, + true, + false, + move |res| { + let VulkanImageMemory::Internal(shm) = &img2.ty else { + unreachable!(); + }; + if let Err(e) = shm.async_upload_after_allocation(&img2, &client_mem, res) { + shm.async_data.as_ref().unwrap().complete(Err(e)); + } + }, + ) + } + + fn async_upload_after_allocation( + &self, + img: &Rc, + client_mem: &Rc, + res: Result, + ) -> Result<(), VulkanError> { + let staging = Rc::new(res?); + let data = self.async_data.as_ref().unwrap(); + data.staging.set(Some(staging.clone())); + let copies = &*data.regions.borrow(); + self.async_upload_initiate_copy(img, data, &staging, copies, client_mem) + } + + fn async_upload_initiate_copy( + &self, + img: &Rc, + data: &VulkanShmImageAsyncData, + staging: &VulkanStagingBuffer, + copies: &[BufferImageCopy2], + client_mem: &Rc, + ) -> Result<(), VulkanError> { + img.renderer.check_defunct()?; + + let id = img.renderer.allocate_point(); + let pending; + if client_mem.pool().sigbus_impossible() { + let mut job = data.copy_job.take().unwrap_or_else(|| { + Box::new(CopyUploadJob { + img: None, + id, + _mem: None, + work: unsafe { ImgCopyWork::new() }, + }) + }); + job.id = id; + job.img = Some(img.clone()); + job._mem = Some(client_mem.clone()); + job.work.src = client_mem.ptr() as _; + job.work.dst = staging.allocation.mem.unwrap(); + job.work.width = img.width as _; + job.work.stride = img.stride as _; + job.work.bpp = self.shm_info.bpp as _; + job.work.rects.clear(); + for copy in copies { + job.work.rects.push( + Rect::new_sized( + copy.image_offset.x as _, + copy.image_offset.y as _, + copy.image_extent.width as _, + copy.image_extent.height as _, + ) + .unwrap(), + ); + } + pending = data.cpu.submit(job); + } else { + let mut min_offset = client_mem.ptr().len() as u64; + let mut max_offset = 0; + for copy in copies { + min_offset = min_offset.min(copy.buffer_offset); + let len = img.stride * (copy.image_extent.height - 1) + + copy.image_extent.width * self.shm_info.bpp; + max_offset = max_offset.max(copy.buffer_offset + len as u64); + } + let mut job = data.io_job.take().unwrap_or_else(|| { + Box::new(IoUploadJob { + img: None, + id, + _mem: None, + work: unsafe { ReadWriteWork::new() }, + fd: None, + }) + }); + job.id = id; + job.img = Some(img.clone()); + job._mem = Some(client_mem.clone()); + job.fd = Some(client_mem.pool().fd().clone()); + unsafe { + let config = job.work.config(); + config.fd = client_mem.pool().fd().raw(); + config.offset = client_mem.offset() + min_offset as usize; + config.ptr = staging.allocation.mem.unwrap().add(min_offset as _); + config.len = max_offset.saturating_sub(min_offset) as usize; + config.write = false; + } + pending = data.cpu.submit(job); + } + + img.renderer.pending_cpu_jobs.set(id, pending); + + Ok(()) + } + + fn async_upload_copy_buffer_to_image( + &self, + img: &Rc, + data: &VulkanShmImageAsyncData, + res: Result<(), ReadWriteJobError>, + ) -> Result<(), VulkanError> { + if let Err(e) = res { + return Err(VulkanError::AsyncCopyToStaging(e)); + } + img.renderer.check_defunct()?; + let regions = &*data.regions.borrow(); + let staging = data.staging.get().unwrap(); + staging.upload(|_, _| ())?; + let Some((cmd, fence, sync_file, point)) = + self.submit_buffer_to_image_copy(img, &staging, regions)? + else { + return Ok(()); + }; + let future = img.renderer.eng.spawn(await_async_upload( point, img.clone(), cmd, - release_sync_file, - release_fence, - staging, + fence, + sync_file, )); img.renderer.pending_uploads.set(point, future); Ok(()) } } -async fn await_upload( +pub(super) struct IoUploadJob { + img: Option>, + id: u64, + _mem: Option>, + fd: Option>, + work: ReadWriteWork, +} + +pub(super) struct CopyUploadJob { + img: Option>, + id: u64, + _mem: Option>, + work: ImgCopyWork, +} + +impl CpuJob for IoUploadJob { + fn work(&mut self) -> &mut dyn CpuWork { + &mut self.work + } + + fn completed(mut self: Box) { + self._mem = None; + self.fd = None; + let img = self.img.take().unwrap(); + let res = self.work.config().result.take().unwrap(); + complete_async_upload(&img, self.id, res, |data| data.io_job.set(Some(self))); + } +} + +impl CpuJob for CopyUploadJob { + fn work(&mut self) -> &mut dyn CpuWork { + &mut self.work + } + + fn completed(mut self: Box) { + self._mem = None; + let img = self.img.take().unwrap(); + complete_async_upload(&img, self.id, Ok(()), |data| data.copy_job.set(Some(self))); + } +} + +fn complete_async_upload( + img: &Rc, + id: u64, + res: Result<(), ReadWriteJobError>, + store: impl FnOnce(&VulkanShmImageAsyncData), +) { + img.renderer.pending_cpu_jobs.remove(&id); + let VulkanImageMemory::Internal(shm) = &img.ty else { + unreachable!(); + }; + let data = shm.async_data.as_ref().unwrap(); + store(data); + if let Err(e) = shm.async_upload_copy_buffer_to_image(img, data, res) { + data.complete(Err(e)); + } +} + +async fn await_async_upload( id: u64, img: Rc, buf: Rc, - sync_file: SyncFile, _fence: Rc, - _staging: VulkanStagingBuffer, + sync_file: SyncFile, ) { let res = img.renderer.ring.readable(&sync_file.0).await; if let Err(e) = res { @@ -250,6 +606,11 @@ async fn await_upload( } img.renderer.command_buffers.push(buf); img.renderer.pending_uploads.remove(&id); + let VulkanImageMemory::Internal(shm) = &img.ty else { + unreachable!(); + }; + let data = shm.async_data.as_ref().unwrap(); + data.complete(Ok(())); } impl VulkanRenderer { @@ -261,6 +622,7 @@ impl VulkanRenderer { stride: i32, data: &[Cell], for_download: bool, + cpu_worker: Option<&Rc>, ) -> Result, VulkanError> { let Some(shm_info) = &format.shm_info else { return Err(VulkanError::UnsupportedShmFormat(format.name)); @@ -283,7 +645,7 @@ impl VulkanRenderer { if width > shm.limits.max_width || height > shm.limits.max_height { return Err(VulkanError::ImageTooLarge); } - let size = stride.checked_mul(height).ok_or(VulkanError::ShmOverflow)?; + let size = stride.checked_mul(height).ok_or(VulkanError::ShmOverflow)? as u64; let usage = ImageUsageFlags::TRANSFER_SRC | match for_download { true => ImageUsageFlags::COLOR_ATTACHMENT, @@ -335,11 +697,25 @@ impl VulkanRenderer { .create_image_view(&image_view_create_info, None) }; let view = view.map_err(VulkanError::CreateImageView)?; + let mut async_data = None; + if let Some(cpu) = cpu_worker { + async_data = Some(VulkanShmImageAsyncData { + busy: Cell::new(false), + io_job: Default::default(), + copy_job: Default::default(), + staging: Default::default(), + callback: Default::default(), + callback_id: Cell::new(0), + regions: Default::default(), + cpu: cpu.clone(), + }); + } let shm = VulkanShmImage { - _size: size as u64, + size, stride, _allocation: allocation, shm_info, + async_data, }; destroy_image.forget(); let img = Rc::new(VulkanImage { @@ -352,6 +728,7 @@ impl VulkanRenderer { render_view: None, image, is_undefined: Cell::new(true), + contents_are_undefined: Cell::new(true), ty: VulkanImageMemory::Internal(shm), render_ops: Default::default(), bridge: None, diff --git a/src/gfx_apis/vulkan/staging.rs b/src/gfx_apis/vulkan/staging.rs index 7e205a81..5a3faeac 100644 --- a/src/gfx_apis/vulkan/staging.rs +++ b/src/gfx_apis/vulkan/staging.rs @@ -1,9 +1,13 @@ use { - crate::gfx_apis::vulkan::{ - allocator::{VulkanAllocation, VulkanAllocator}, - device::VulkanDevice, - util::OnDrop, - VulkanError, + crate::{ + cpu_worker::CpuWorker, + gfx_apis::vulkan::{ + allocator::{VulkanAllocation, VulkanAllocator}, + device::VulkanDevice, + renderer::VulkanRenderer, + VulkanError, + }, + utils::on_drop::{OnDrop, OnDrop2}, }, ash::vk::{Buffer, BufferCreateInfo, BufferUsageFlags, MappedMemoryRange}, gpu_alloc::UsageFlags, @@ -26,24 +30,8 @@ impl VulkanDevice { download: bool, transient: bool, ) -> Result { - let mut vk_usage = BufferUsageFlags::empty(); - let mut usage = UsageFlags::empty(); - if upload { - vk_usage |= BufferUsageFlags::TRANSFER_SRC; - usage |= UsageFlags::UPLOAD; - } - if download { - vk_usage |= BufferUsageFlags::TRANSFER_DST; - usage |= UsageFlags::DOWNLOAD; - } - if transient { - usage |= UsageFlags::TRANSIENT; - } - let buffer = { - let create_info = BufferCreateInfo::default().size(size).usage(vk_usage); - let buffer = unsafe { self.device.create_buffer(&create_info, None) }; - buffer.map_err(VulkanError::CreateBuffer)? - }; + let (vk_usage, usage) = get_usage(upload, download, transient); + let buffer = self.create_buffer(size, vk_usage)?; let destroy_buffer = OnDrop(|| unsafe { self.device.destroy_buffer(buffer, None) }); let memory_requirements = unsafe { self.device.get_buffer_memory_requirements(buffer) }; let allocation = allocator.alloc(&memory_requirements, usage, true)?; @@ -62,6 +50,72 @@ impl VulkanDevice { size, }) } + + pub(super) fn create_shm_staging( + self: &Rc, + renderer: &Rc, + cpu: &Rc, + size: u64, + upload: bool, + download: bool, + cb: impl FnOnce(Result) + 'static, + ) -> Result<(), VulkanError> { + let (vk_usage, usage) = get_usage(upload, download, false); + let buffer = self.create_buffer(size, vk_usage)?; + let memory_requirements = unsafe { self.device.get_buffer_memory_requirements(buffer) }; + let slf = self.clone(); + let destroy_buffer = + OnDrop2::new(move || unsafe { slf.device.destroy_buffer(buffer, None) }); + let slf = self.clone(); + let finish_allocation = move |res| { + let allocation: VulkanAllocation = res?; + { + let res = unsafe { + slf.device + .bind_buffer_memory(buffer, allocation.memory, allocation.offset) + }; + res.map_err(VulkanError::BindBufferMemory)?; + } + destroy_buffer.forget(); + Ok(VulkanStagingBuffer { + device: slf.clone(), + allocation, + buffer, + size, + }) + }; + renderer.shm_allocator.async_alloc( + renderer, + cpu, + memory_requirements, + usage, + true, + move |res| cb(finish_allocation(res)), + ) + } + + fn create_buffer(&self, size: u64, usage: BufferUsageFlags) -> Result { + let create_info = BufferCreateInfo::default().size(size).usage(usage); + let buffer = unsafe { self.device.create_buffer(&create_info, None) }; + buffer.map_err(VulkanError::CreateBuffer) + } +} + +fn get_usage(upload: bool, download: bool, transient: bool) -> (BufferUsageFlags, UsageFlags) { + let mut vk_usage = BufferUsageFlags::empty(); + let mut usage = UsageFlags::empty(); + if upload { + vk_usage |= BufferUsageFlags::TRANSFER_SRC; + usage |= UsageFlags::UPLOAD; + } + if download { + vk_usage |= BufferUsageFlags::TRANSFER_DST; + usage |= UsageFlags::DOWNLOAD; + } + if transient { + usage |= UsageFlags::TRANSIENT; + } + (vk_usage, usage) } impl VulkanStagingBuffer { @@ -70,27 +124,31 @@ impl VulkanStagingBuffer { F: FnOnce(*mut u8, usize) -> T, { let t = f(self.allocation.mem.unwrap(), self.size as usize); - let range = self.range(); - let res = unsafe { self.device.device.flush_mapped_memory_ranges(&[range]) }; - res.map_err(VulkanError::FlushMemory).map(|_| t) + if let Some(mask) = self.allocation.coherency_mask { + let range = self.incoherent_range(mask); + let res = unsafe { self.device.device.flush_mapped_memory_ranges(&[range]) }; + res.map_err(VulkanError::FlushMemory)?; + } + Ok(t) } pub fn download(&self, f: F) -> Result where F: FnOnce(*const u8, usize) -> T, { - let range = self.range(); - let res = unsafe { self.device.device.invalidate_mapped_memory_ranges(&[range]) }; - res.map_err(VulkanError::FlushMemory)?; + if let Some(mask) = self.allocation.coherency_mask { + let range = self.incoherent_range(mask); + let res = unsafe { self.device.device.invalidate_mapped_memory_ranges(&[range]) }; + res.map_err(VulkanError::FlushMemory)?; + } Ok(f(self.allocation.mem.unwrap(), self.size as usize)) } - fn range(&self) -> MappedMemoryRange { - let atom_mask = self.allocation.allocator.non_coherent_atom_mask; + fn incoherent_range(&self, mask: u64) -> MappedMemoryRange { MappedMemoryRange::default() .memory(self.allocation.memory) - .offset(self.allocation.offset & !atom_mask) - .size((self.allocation.size + atom_mask) & !atom_mask) + .offset(self.allocation.offset & !mask) + .size((self.allocation.size + mask) & !mask) } } diff --git a/src/gfx_apis/vulkan/util.rs b/src/gfx_apis/vulkan/util.rs deleted file mode 100644 index a7a12d79..00000000 --- a/src/gfx_apis/vulkan/util.rs +++ /dev/null @@ -1,17 +0,0 @@ -use std::mem; - -pub struct OnDrop(pub F) -where - F: FnMut() + Copy; - -impl OnDrop { - pub fn forget(self) { - mem::forget(self); - } -} - -impl Drop for OnDrop { - fn drop(&mut self) { - (self.0)(); - } -} diff --git a/src/ifs/jay_input.rs b/src/ifs/jay_input.rs index f0362763..2e6d26fe 100644 --- a/src/ifs/jay_input.rs +++ b/src/ifs/jay_input.rs @@ -165,11 +165,18 @@ impl JayInput { } } - fn set_keymap_impl(&self, keymap: &OwnedFd, len: u32, f: F) -> Result<(), JayInputError> + fn set_keymap_impl(&self, keymap: &Rc, len: u32, f: F) -> Result<(), JayInputError> where F: FnOnce(&Rc) -> Result<(), JayInputError>, { - let cm = Rc::new(ClientMem::new(keymap.raw(), len as _, true)?).offset(0); + let cm = Rc::new(ClientMem::new( + keymap, + len as _, + true, + Some(&self.client), + None, + )?) + .offset(0); let mut map = vec![]; cm.read(&mut map)?; self.or_error(|| { diff --git a/src/ifs/wl_buffer.rs b/src/ifs/wl_buffer.rs index ffe03415..3068d572 100644 --- a/src/ifs/wl_buffer.rs +++ b/src/ifs/wl_buffer.rs @@ -7,7 +7,7 @@ use { ifs::wl_surface::WlSurface, leaks::Tracker, object::{Object, Version}, - rect::Rect, + rect::{Rect, Region}, theme::Color, utils::errorfmt::ErrorFmt, video::dmabuf::DmaBuf, @@ -22,7 +22,7 @@ use { pub enum WlBufferStorage { Shm { - mem: ClientMemOffset, + mem: Rc, stride: i32, }, Dmabuf { @@ -41,6 +41,7 @@ pub struct WlBuffer { pub dmabuf: Option, render_ctx_version: Cell, pub storage: RefCell>, + shm: bool, pub color: Option, width: i32, height: i32, @@ -52,6 +53,10 @@ impl WlBuffer { self.destroyed.get() } + pub fn is_shm(&self) -> bool { + self.shm + } + pub fn new_dmabuf( id: WlBufferId, client: &Rc, @@ -76,6 +81,7 @@ impl WlBuffer { tex: None, fb: None, })), + shm: false, tracker: Default::default(), color: None, } @@ -100,7 +106,7 @@ impl WlBuffer { if required > mem.len() as u64 { return Err(WlBufferError::OutOfBounds); } - let mem = mem.offset(offset); + let mem = Rc::new(mem.offset(offset)); let min_row_size = width as u64 * shm_info.bpp as u64; if (stride as u64) < min_row_size { return Err(WlBufferError::StrideTooSmall); @@ -114,6 +120,7 @@ impl WlBuffer { dmabuf: None, render_ctx_version: Cell::new(client.state.render_ctx_version.get()), storage: RefCell::new(Some(WlBufferStorage::Shm { mem, stride })), + shm: true, width, height, tracker: Default::default(), @@ -138,6 +145,7 @@ impl WlBuffer { dmabuf: None, render_ctx_version: Cell::new(client.state.render_ctx_version.get()), storage: RefCell::new(None), + shm: false, width: 1, height: 1, tracker: Default::default(), @@ -153,7 +161,7 @@ impl WlBuffer { let had_texture = self.reset_gfx_objects(surface); if had_texture { if let Some(surface) = surface { - self.update_texture_or_log(surface, None); + self.update_texture_or_log(surface, true); } } } @@ -166,7 +174,10 @@ impl WlBuffer { let had_texture = match s { WlBufferStorage::Shm { .. } => { return match surface { - Some(s) => s.shm_texture.take().is_some(), + Some(s) => { + s.shm_textures.back().tex.take(); + s.shm_textures.front().tex.take().is_some() + } None => false, }; } @@ -201,24 +212,24 @@ impl WlBuffer { match &*self.storage.borrow() { None => None, Some(s) => match s { - WlBufferStorage::Shm { .. } => surface.shm_texture.get(), + WlBufferStorage::Shm { .. } => surface + .shm_textures + .front() + .tex + .get() + .map(|t| t.into_texture()), WlBufferStorage::Dmabuf { tex, .. } => tex.clone(), }, } } - pub fn update_texture_or_log(&self, surface: &WlSurface, damage: Option<&[Rect]>) { - if let Err(e) = self.update_texture(surface, damage) { + pub fn update_texture_or_log(&self, surface: &WlSurface, sync_shm: bool) { + if let Err(e) = self.update_texture(surface, sync_shm) { log::warn!("Could not update texture: {}", ErrorFmt(e)); } } - fn update_texture( - &self, - surface: &WlSurface, - damage: Option<&[Rect]>, - ) -> Result<(), WlBufferError> { - let old_shm_texture = surface.shm_texture.take(); + fn update_texture(&self, surface: &WlSurface, sync_shm: bool) -> Result<(), WlBufferError> { let storage = &mut *self.storage.borrow_mut(); let storage = match storage { Some(s) => s, @@ -226,19 +237,19 @@ impl WlBuffer { }; match storage { WlBufferStorage::Shm { mem, stride } => { - if let Some(ctx) = self.client.state.render_ctx.get() { - let tex = mem.access(|mem| { - ctx.shmem_texture( - old_shm_texture, - mem, + if sync_shm { + if let Some(ctx) = self.client.state.render_ctx.get() { + let tex = ctx.async_shmem_texture( self.format, self.width, self.height, *stride, - damage, - ) - })??; - surface.shm_texture.set(Some(tex)); + &self.client.state.cpu_worker, + )?; + mem.access(|mem| tex.clone().sync_upload(mem, Region::new2(self.rect)))??; + surface.shm_textures.front().tex.set(Some(tex)); + surface.shm_textures.front().damage.clear(); + } } } WlBufferStorage::Dmabuf { img, tex, .. } => { diff --git a/src/ifs/wl_seat/zwp_virtual_keyboard_v1.rs b/src/ifs/wl_seat/zwp_virtual_keyboard_v1.rs index 2dde71f4..e100ce3e 100644 --- a/src/ifs/wl_seat/zwp_virtual_keyboard_v1.rs +++ b/src/ifs/wl_seat/zwp_virtual_keyboard_v1.rs @@ -56,9 +56,15 @@ impl ZwpVirtualKeyboardV1RequestHandler for ZwpVirtualKeyboardV1 { if req.size > MAX_SIZE { return Err(ZwpVirtualKeyboardV1Error::OversizedKeymap); } - let client_mem = ClientMem::new(req.fd.raw(), req.size as usize - 1, true) - .map(Rc::new) - .map_err(ZwpVirtualKeyboardV1Error::MapKeymap)?; + let client_mem = ClientMem::new( + &req.fd, + req.size as usize - 1, + true, + Some(&self.client), + None, + ) + .map(Rc::new) + .map_err(ZwpVirtualKeyboardV1Error::MapKeymap)?; let mut map = vec![]; client_mem .offset(0) diff --git a/src/ifs/wl_shm_pool.rs b/src/ifs/wl_shm_pool.rs index 30687f6c..b03dd608 100644 --- a/src/ifs/wl_shm_pool.rs +++ b/src/ifs/wl_shm_pool.rs @@ -34,7 +34,13 @@ impl WlShmPool { Ok(Self { id, client: client.clone(), - mem: CloneCell::new(Rc::new(ClientMem::new(fd.raw(), len, false)?)), + mem: CloneCell::new(Rc::new(ClientMem::new( + &fd, + len, + false, + Some(client), + Some(&client.state.cpu_worker), + )?)), fd, tracker: Default::default(), version, @@ -82,9 +88,11 @@ impl WlShmPoolRequestHandler for WlShmPool { return Err(WlShmPoolError::CannotShrink); } self.mem.set(Rc::new(ClientMem::new( - self.fd.raw(), + &self.fd, req.size as usize, false, + Some(&self.client), + Some(&self.client.state.cpu_worker), )?)); Ok(()) } diff --git a/src/ifs/wl_surface.rs b/src/ifs/wl_surface.rs index d04b82b2..63b9101b 100644 --- a/src/ifs/wl_surface.rs +++ b/src/ifs/wl_surface.rs @@ -23,7 +23,8 @@ use { drm_feedback::DrmFeedback, fixed::Fixed, gfx_api::{ - AcquireSync, BufferResv, BufferResvUser, GfxTexture, ReleaseSync, SampleRect, SyncFile, + AcquireSync, AsyncShmGfxTexture, BufferResv, BufferResvUser, GfxError, ReleaseSync, + SampleRect, SyncFile, }, ifs::{ wl_buffer::WlBuffer, @@ -59,16 +60,16 @@ use { }, leaks::Tracker, object::{Object, Version}, - rect::{Rect, Region}, + rect::{DamageQueue, Rect, Region}, renderer::Renderer, tree::{ ContainerNode, FindTreeResult, FoundNode, Node, NodeId, NodeVisitor, NodeVisitorBase, OutputNode, OutputNodeId, PlaceholderNode, ToplevelNode, }, utils::{ - cell_ext::CellExt, clonecell::CloneCell, copyhashmap::CopyHashMap, errorfmt::ErrorFmt, - linkedlist::LinkedList, numcell::NumCell, smallmap::SmallMap, - transform_ext::TransformExt, + cell_ext::CellExt, clonecell::CloneCell, copyhashmap::CopyHashMap, + double_buffered::DoubleBuffered, errorfmt::ErrorFmt, linkedlist::LinkedList, + numcell::NumCell, smallmap::SmallMap, transform_ext::TransformExt, }, video::{ dmabuf::DMA_BUF_SYNC_READ, @@ -249,6 +250,11 @@ impl BufferResv for SurfaceBuffer { } } +pub struct SurfaceShmTexture { + pub tex: CloneCell>>, + pub damage: DamageQueue, +} + pub struct WlSurface { pub id: WlSurfaceId, pub node_id: SurfaceNodeId, @@ -271,7 +277,7 @@ pub struct WlSurface { pub buffer: CloneCell>>, buffer_presented: Cell, buffer_had_frame_request: Cell, - pub shm_texture: CloneCell>>, + pub shm_textures: DoubleBuffered, pub buf_x: NumCell, pub buf_y: NumCell, pub children: RefCell>>, @@ -584,7 +590,10 @@ impl WlSurface { buffer: Default::default(), buffer_presented: Default::default(), buffer_had_frame_request: Default::default(), - shm_texture: Default::default(), + shm_textures: DoubleBuffered::new(DamageQueue::new().map(|damage| SurfaceShmTexture { + tex: Default::default(), + damage, + })), buf_x: Default::default(), buf_y: Default::default(), children: Default::default(), @@ -909,7 +918,7 @@ impl WlSurfaceRequestHandler for WlSurface { *children = None; } self.buffer.set(None); - self.shm_texture.take(); + self.reset_shm_textures(); if let Some(xwayland_serial) = self.xwayland_serial.get() { self.client .surfaces_by_xwayland_serial @@ -1077,11 +1086,13 @@ impl WlSurface { old_raw_size = Some(buffer.buffer.rect); } if let Some(buffer) = buffer_change { - let damage = match pending.damage_full || pending.surface_damage.is_not_empty() { - true => None, - false => Some(&pending.buffer_damage[..]), - }; - buffer.update_texture_or_log(self, damage); + if buffer.is_shm() { + self.shm_textures.flip(); + self.shm_textures.front().damage.clear(); + } else { + self.reset_shm_textures(); + } + buffer.update_texture_or_log(self, false); let (sync, release_sync) = match pending.explicit_sync { false => (AcquireSync::Implicit, ReleaseSync::Implicit), true => (AcquireSync::Unnecessary, ReleaseSync::Explicit), @@ -1103,7 +1114,7 @@ impl WlSurface { self.buffer_had_frame_request.set(false); } } else { - self.shm_texture.take(); + self.reset_shm_textures(); self.buf_x.set(0); self.buf_y.set(0); for (_, cursor) in &self.cursors { @@ -1321,6 +1332,13 @@ impl WlSurface { Ok(()) } + pub fn reset_shm_textures(&self) { + for tex in &*self.shm_textures { + tex.tex.take(); + tex.damage.clear(); + } + } + fn apply_damage(&self, pending: &PendingState) { let bounds = self.toplevel.get().map(|tl| tl.node_absolute_position()); let pos = self.buffer_abs_pos.get(); @@ -1915,6 +1933,12 @@ pub enum WlSurfaceError { UnexpectedSyncPoints, #[error("The supplied region is invalid")] InvalidRect, + #[error("There is no render context")] + NoRenderContext, + #[error("Could not create a shm texture")] + CreateAsyncShmTexture(#[source] GfxError), + #[error("Could not prepare upload to a shm texture")] + PrepareAsyncUpload(#[source] GfxError), } efrom!(WlSurfaceError, ClientError); efrom!(WlSurfaceError, XdgSurfaceError); diff --git a/src/ifs/wl_surface/commit_timeline.rs b/src/ifs/wl_surface/commit_timeline.rs index a150b6fa..8ec7a1b0 100644 --- a/src/ifs/wl_surface/commit_timeline.rs +++ b/src/ifs/wl_surface/commit_timeline.rs @@ -1,6 +1,10 @@ use { crate::{ - ifs::wl_surface::{PendingState, WlSurface, WlSurfaceError}, + gfx_api::{AsyncShmGfxTextureCallback, GfxError, PendingShmUpload}, + ifs::{ + wl_buffer::WlBufferStorage, + wl_surface::{PendingState, WlSurface, WlSurfaceError}, + }, utils::{ clonecell::CloneCell, copyhashmap::CopyHashMap, @@ -14,13 +18,14 @@ use { DrmError, }, }, - isnt::std_1::primitive::IsntSliceExt, + isnt::std_1::{primitive::IsntSliceExt, vec::IsntVecExt}, smallvec::SmallVec, std::{ cell::{Cell, RefCell}, mem, - ops::{Deref, DerefMut}, + ops::DerefMut, rc::Rc, + slice, }, thiserror::Error, }; @@ -76,6 +81,8 @@ pub enum CommitTimelineError { Wait(#[source] DrmError), #[error("The client has too many pending commits")] Depth, + #[error("Could not upload a shm texture")] + ShmUpload(#[source] GfxError), } impl CommitTimelines { @@ -119,6 +126,7 @@ fn break_loops(list: &LinkedList) { entry.link.take(); if let EntryKind::Commit(c) = &entry.kind { c.wait_handles.take(); + *c.shm_upload.borrow_mut() = ShmUploadState::None; } } } @@ -145,7 +153,9 @@ impl CommitTimeline { ) -> Result<(), CommitTimelineError> { let mut points = SmallVec::new(); consume_acquire_points(pending, &mut points); - if points.is_empty() && self.own_timeline.entries.is_empty() { + let mut pending_uploads = 0; + count_shm_uploads(pending, &mut pending_uploads); + if points.is_empty() && pending_uploads == 0 && self.own_timeline.entries.is_empty() { return surface .apply_state(pending) .map_err(CommitTimelineError::ImmediateCommit); @@ -162,23 +172,35 @@ impl CommitTimeline { pending: RefCell::new(mem::take(pending)), sync_obj: NumCell::new(points.len()), wait_handles: Cell::new(Default::default()), + pending_uploads: NumCell::new(pending_uploads), + shm_upload: RefCell::new(ShmUploadState::None), }), ); - if points.is_not_empty() { - let mut wait_handles = SmallVec::new(); - let noderef = Rc::new(noderef); - for (sync_obj, point) in points { - let handle = self - .shared - .wfs - .wait(&sync_obj, point, true, noderef.clone()) - .map_err(CommitTimelineError::RegisterWait)?; - wait_handles.push(handle); - } + let mut needs_flush = false; + if points.is_not_empty() || pending_uploads > 0 { + let noderef = Rc::new(noderef.clone()); let EntryKind::Commit(commit) = &noderef.kind else { unreachable!(); }; - commit.wait_handles.set(wait_handles); + if points.is_not_empty() { + let mut wait_handles = SmallVec::new(); + for (sync_obj, point) in points { + let handle = self + .shared + .wfs + .wait(&sync_obj, point, true, noderef.clone()) + .map_err(CommitTimelineError::RegisterWait)?; + wait_handles.push(handle); + } + commit.wait_handles.set(wait_handles); + } + if pending_uploads > 0 { + *commit.shm_upload.borrow_mut() = ShmUploadState::Todo(noderef.clone()); + needs_flush = true; + } + } + if needs_flush && noderef.prev().is_none() { + flush_from(noderef.clone()).map_err(CommitTimelineError::DelayedCommit)?; } Ok(()) } @@ -194,12 +216,33 @@ impl SyncObjWaiter for NodeRef { return; } commit.sync_obj.fetch_sub(1); - if let Err(e) = flush_from(self.deref().clone()) { + flush_commit(&self, commit); + } +} + +fn flush_commit(node_ref: &NodeRef, commit: &Commit) { + if let Err(e) = flush_from(node_ref.clone()) { + commit + .surface + .client + .error(CommitTimelineError::DelayedCommit(e)); + } +} + +impl AsyncShmGfxTextureCallback for NodeRef { + fn completed(self: Rc, res: Result<(), GfxError>) { + let EntryKind::Commit(commit) = &self.kind else { + unreachable!(); + }; + if let Err(e) = res { commit .surface .client - .error(CommitTimelineError::DelayedCommit(e)); + .error(CommitTimelineError::ShmUpload(e)); + return; } + commit.pending_uploads.fetch_sub(1); + flush_commit(&self, commit); } } @@ -216,11 +259,19 @@ enum EntryKind { Gc(CommitTimelineId), } +enum ShmUploadState { + None, + Todo(Rc>), + Scheduled(#[expect(dead_code)] SmallVec<[PendingShmUpload; 1]>), +} + struct Commit { surface: Rc, pending: RefCell>, sync_obj: NumCell, wait_handles: Cell>, + pending_uploads: NumCell, + shm_upload: RefCell, } fn flush_from(mut point: NodeRef) -> Result<(), WlSurfaceError> { @@ -243,7 +294,17 @@ impl NodeRef { } match &self.kind { EntryKind::Commit(c) => { + let mut has_unmet_dependencies = false; if c.sync_obj.get() > 0 { + has_unmet_dependencies = true; + } + if c.pending_uploads.get() > 0 { + check_shm_uploads(c)?; + if c.pending_uploads.get() > 0 { + has_unmet_dependencies = true; + } + } + if has_unmet_dependencies { return Ok(false); } c.surface.apply_state(c.pending.borrow_mut().deref_mut())?; @@ -266,6 +327,90 @@ impl NodeRef { } } +fn check_shm_uploads(c: &Commit) -> Result<(), WlSurfaceError> { + let state = &mut *c.shm_upload.borrow_mut(); + if let ShmUploadState::Todo(node_ref) = state { + let mut pending = SmallVec::new(); + schedule_async_uploads(node_ref, &c.surface, &c.pending.borrow(), &mut pending)?; + c.pending_uploads.set(pending.len()); + *state = ShmUploadState::Scheduled(pending); + } + Ok(()) +} + +fn schedule_async_uploads( + node_ref: &Rc>, + surface: &WlSurface, + pending: &PendingState, + uploads: &mut SmallVec<[PendingShmUpload; 1]>, +) -> Result<(), WlSurfaceError> { + if let Some(pending) = schedule_async_upload(node_ref, surface, pending)? { + uploads.push(pending); + } + for ss in pending.subsurfaces.values() { + if let Some(state) = &ss.pending.state { + schedule_async_uploads(node_ref, &ss.subsurface.surface, state, uploads)?; + } + } + Ok(()) +} + +fn schedule_async_upload( + node_ref: &Rc>, + surface: &WlSurface, + pending: &PendingState, +) -> Result, WlSurfaceError> { + let Some(Some(buf)) = &pending.buffer else { + return Ok(None); + }; + let Some(WlBufferStorage::Shm { mem, stride, .. }) = &*buf.storage.borrow() else { + return Ok(None); + }; + let back = surface.shm_textures.back(); + let mut back_tex_opt = back.tex.get(); + if let Some(back_tex) = &back_tex_opt { + if !back_tex.compatible_with(buf.format, buf.rect.width(), buf.rect.height(), *stride) { + back_tex_opt = None; + } + } + let damage_full = || { + back.damage.clear(); + back.damage.damage(slice::from_ref(&buf.rect)); + }; + let back_tex = match back_tex_opt { + Some(b) => { + if pending.damage_full || pending.surface_damage.is_not_empty() { + damage_full(); + } else { + back.damage.damage(&pending.buffer_damage); + } + b + } + None => { + damage_full(); + let state = &surface.client.state; + let ctx = state + .render_ctx + .get() + .ok_or(WlSurfaceError::NoRenderContext)?; + let back_tex = ctx + .async_shmem_texture( + buf.format, + buf.rect.width(), + buf.rect.height(), + *stride, + &state.cpu_worker, + ) + .map_err(WlSurfaceError::CreateAsyncShmTexture)?; + back.tex.set(Some(back_tex.clone())); + back_tex + } + }; + back_tex + .async_upload(node_ref.clone(), mem, back.damage.get()) + .map_err(WlSurfaceError::PrepareAsyncUpload) +} + type Point = (Rc, SyncObjPoint); fn consume_acquire_points(pending: &mut PendingState, points: &mut SmallVec<[Point; 1]>) { @@ -301,3 +446,16 @@ fn set_effective_timeline( } } } + +fn count_shm_uploads(pending: &PendingState, count: &mut usize) { + if let Some(Some(buffer)) = &pending.buffer { + if buffer.is_shm() { + *count += 1; + } + } + for ss in pending.subsurfaces.values() { + if let Some(state) = &ss.pending.state { + count_shm_uploads(state, count); + } + } +} diff --git a/src/io_uring.rs b/src/io_uring.rs index baf4c934..60c5291a 100644 --- a/src/io_uring.rs +++ b/src/io_uring.rs @@ -5,7 +5,8 @@ use { io_uring::{ ops::{ accept::AcceptTask, async_cancel::AsyncCancelTask, connect::ConnectTask, - poll::PollTask, read_write::ReadWriteTask, recvmsg::RecvmsgTask, + poll::PollTask, read_write::ReadWriteTask, + read_write_no_cancel::ReadWriteNoCancelTask, recvmsg::RecvmsgTask, sendmsg::SendmsgTask, timeout::TimeoutTask, timeout_link::TimeoutLinkTask, }, pending_result::PendingResults, @@ -22,7 +23,6 @@ use { copyhashmap::CopyHashMap, errorfmt::ErrorFmt, mmap::{mmap, Mmapped}, - numcell::NumCell, oserror::OsError, ptr_ext::{MutPtrExt, PtrExt}, stack::Stack, @@ -206,6 +206,7 @@ impl IoUring { tasks: Default::default(), pending_results: Default::default(), cached_read_writes: Default::default(), + cached_read_writes_no_cancel: Default::default(), cached_cancels: Default::default(), cached_polls: Default::default(), cached_sendmsg: Default::default(), @@ -229,6 +230,10 @@ impl IoUring { self.ring.kill(); res } + + pub fn cancel(&self, id: IoUringTaskId) { + self.ring.cancel_task(id); + } } struct IoUringData { @@ -254,14 +259,15 @@ struct IoUringData { cqes_consumed: AsyncEvent, - next: NumCell, - to_encode: SyncQueue, - pending_in_kernel: CopyHashMap, - tasks: CopyHashMap>, + next: IoUringTaskIds, + to_encode: SyncQueue, + pending_in_kernel: CopyHashMap, + tasks: CopyHashMap>, pending_results: PendingResults, cached_read_writes: Stack>, + cached_read_writes_no_cancel: Stack>, cached_cancels: Stack>, cached_polls: Stack>, cached_sendmsg: Stack>, @@ -276,7 +282,7 @@ struct IoUringData { } unsafe trait Task { - fn id(&self) -> u64; + fn id(&self) -> IoUringTaskId; fn complete(self: Box, ring: &IoUringData, res: i32); fn encode(&self, sqe: &mut io_uring_sqe); @@ -347,8 +353,9 @@ impl IoUringData { let entry = self.cqmap.deref()[idx].get(); head = head.wrapping_add(1); self.cqhead.deref().store(head, Release); - if let Some(pending) = self.tasks.remove(&entry.user_data) { - self.pending_in_kernel.remove(&entry.user_data); + let id = IoUringTaskId(entry.user_data); + if let Some(pending) = self.tasks.remove(&id) { + self.pending_in_kernel.remove(&id); pending.complete(self, entry.res); } } @@ -384,7 +391,7 @@ impl IoUringData { let sqe = self.sqesmap.deref()[idx].get().deref_mut(); self.sqmap.deref()[idx].set(idx as _); *sqe = Default::default(); - sqe.user_data = id; + sqe.user_data = id.raw(); task.encode(sqe); if has_timeout { sqe.flags |= IOSQE_IO_LINK; @@ -404,11 +411,11 @@ impl IoUringData { } } - fn id_raw(&self) -> u64 { - self.next.fetch_add(1) + fn id_raw(&self) -> IoUringTaskId { + self.next.next() } - fn cancel_task(&self, id: u64) { + fn cancel_task(&self, id: IoUringTaskId) { if !self.tasks.contains(&id) { return; } @@ -466,8 +473,17 @@ impl IoUringData { } } +linear_ids!(IoUringTaskIds, IoUringTaskId, u64); + +#[expect(clippy::derivable_impls)] +impl Default for IoUringTaskId { + fn default() -> Self { + Self(0) + } +} + struct Cancellable<'a> { - id: u64, + id: IoUringTaskId, data: &'a IoUringData, } diff --git a/src/io_uring/ops.rs b/src/io_uring/ops.rs index 64ea36b7..15fc1235 100644 --- a/src/io_uring/ops.rs +++ b/src/io_uring/ops.rs @@ -5,6 +5,7 @@ pub mod async_cancel; pub mod connect; pub mod poll; pub mod read_write; +pub mod read_write_no_cancel; pub mod recvmsg; pub mod sendmsg; pub mod timeout; diff --git a/src/io_uring/ops/accept.rs b/src/io_uring/ops/accept.rs index 6f751dcf..149592ea 100644 --- a/src/io_uring/ops/accept.rs +++ b/src/io_uring/ops/accept.rs @@ -2,7 +2,7 @@ use { crate::io_uring::{ pending_result::PendingResult, sys::{io_uring_sqe, IORING_OP_ACCEPT}, - IoUring, IoUringData, IoUringError, Task, TaskResultExt, + IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt, }, std::rc::Rc, uapi::{c, OwnedFd}, @@ -39,14 +39,14 @@ struct Data { #[derive(Default)] pub struct AcceptTask { - id: u64, + id: IoUringTaskId, fd: i32, flags: u32, data: Option, } unsafe impl Task for AcceptTask { - fn id(&self) -> u64 { + fn id(&self) -> IoUringTaskId { self.id } diff --git a/src/io_uring/ops/async_cancel.rs b/src/io_uring/ops/async_cancel.rs index d322fd3b..3874ab98 100644 --- a/src/io_uring/ops/async_cancel.rs +++ b/src/io_uring/ops/async_cancel.rs @@ -2,7 +2,7 @@ use { crate::{ io_uring::{ sys::{io_uring_sqe, IORING_OP_ASYNC_CANCEL}, - IoUringData, Task, + IoUringData, IoUringTaskId, Task, }, utils::errorfmt::ErrorFmt, }, @@ -11,12 +11,12 @@ use { #[derive(Default)] pub struct AsyncCancelTask { - id: u64, - target: u64, + id: IoUringTaskId, + target: IoUringTaskId, } impl IoUringData { - pub fn cancel_task_in_kernel(&self, target: u64) { + pub fn cancel_task_in_kernel(&self, target: IoUringTaskId) { let id = self.id_raw(); let mut task = self.cached_cancels.pop().unwrap_or_default(); task.id = id; @@ -26,7 +26,7 @@ impl IoUringData { } unsafe impl Task for AsyncCancelTask { - fn id(&self) -> u64 { + fn id(&self) -> IoUringTaskId { self.id } @@ -41,7 +41,7 @@ unsafe impl Task for AsyncCancelTask { fn encode(&self, sqe: &mut io_uring_sqe) { sqe.opcode = IORING_OP_ASYNC_CANCEL; - sqe.u2.addr = self.target; + sqe.u2.addr = self.target.raw(); } fn is_cancel(&self) -> bool { diff --git a/src/io_uring/ops/connect.rs b/src/io_uring/ops/connect.rs index 0aca6829..a23d460e 100644 --- a/src/io_uring/ops/connect.rs +++ b/src/io_uring/ops/connect.rs @@ -2,7 +2,7 @@ use { crate::io_uring::{ pending_result::PendingResult, sys::{io_uring_sqe, IORING_OP_CONNECT}, - IoUring, IoUringData, IoUringError, Task, TaskResultExt, + IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt, }, std::{mem, ptr, rc::Rc}, uapi::{c, OwnedFd, SockAddr}, @@ -37,7 +37,7 @@ struct Data { } pub struct ConnectTask { - id: u64, + id: IoUringTaskId, fd: i32, sockaddr: c::sockaddr_storage, addrlen: u64, @@ -47,7 +47,7 @@ pub struct ConnectTask { impl Default for ConnectTask { fn default() -> Self { Self { - id: 0, + id: Default::default(), fd: 0, sockaddr: uapi::pod_zeroed(), addrlen: 0, @@ -57,7 +57,7 @@ impl Default for ConnectTask { } unsafe impl Task for ConnectTask { - fn id(&self) -> u64 { + fn id(&self) -> IoUringTaskId { self.id } diff --git a/src/io_uring/ops/poll.rs b/src/io_uring/ops/poll.rs index 6edab2d4..1ac45281 100644 --- a/src/io_uring/ops/poll.rs +++ b/src/io_uring/ops/poll.rs @@ -3,7 +3,7 @@ use { ops::TaskResult, pending_result::PendingResult, sys::{io_uring_sqe, IORING_OP_POLL_ADD}, - IoUring, IoUringData, IoUringError, Task, TaskResultExt, + IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt, }, std::rc::Rc, uapi::{c, OwnedFd}, @@ -45,14 +45,14 @@ struct Data { #[derive(Default)] pub struct PollTask { - id: u64, + id: IoUringTaskId, events: u16, fd: i32, data: Option, } unsafe impl Task for PollTask { - fn id(&self) -> u64 { + fn id(&self) -> IoUringTaskId { self.id } diff --git a/src/io_uring/ops/read_write.rs b/src/io_uring/ops/read_write.rs index 9ebf2c1d..f8db92cb 100644 --- a/src/io_uring/ops/read_write.rs +++ b/src/io_uring/ops/read_write.rs @@ -3,7 +3,7 @@ use { io_uring::{ pending_result::PendingResult, sys::{io_uring_sqe, IORING_OP_READ, IORING_OP_WRITE}, - IoUring, IoUringData, IoUringError, Task, TaskResultExt, + IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt, }, time::Time, utils::buf::Buf, @@ -66,7 +66,7 @@ struct ReadWriteTaskData { #[derive(Default)] pub struct ReadWriteTask { - id: u64, + id: IoUringTaskId, has_timeout: bool, fd: c::c_int, buf: usize, @@ -76,7 +76,7 @@ pub struct ReadWriteTask { } unsafe impl Task for ReadWriteTask { - fn id(&self) -> u64 { + fn id(&self) -> IoUringTaskId { self.id } diff --git a/src/io_uring/ops/read_write_no_cancel.rs b/src/io_uring/ops/read_write_no_cancel.rs new file mode 100644 index 00000000..02cc2107 --- /dev/null +++ b/src/io_uring/ops/read_write_no_cancel.rs @@ -0,0 +1,137 @@ +#[cfg(test)] +mod tests; + +use { + crate::{ + io_uring::{ + pending_result::PendingResult, + sys::{io_uring_sqe, IORING_OP_READ, IORING_OP_WRITE}, + IoUring, IoUringData, IoUringError, IoUringTaskId, Task, TaskResultExt, + }, + time::Time, + utils::on_drop::OnDrop, + }, + uapi::{c, Fd}, +}; + +impl IoUring { + pub async fn read_no_cancel( + &self, + fd: Fd, + offset: usize, + buf: &mut [u8], + cancel: impl FnOnce(IoUringTaskId), + ) -> Result { + self.perform_no_cancel( + fd, + offset, + buf.as_mut_ptr(), + buf.len(), + None, + IORING_OP_READ, + cancel, + ) + .await + } + + pub async fn write_no_cancel( + &self, + fd: Fd, + offset: usize, + buf: &[u8], + timeout: Option