diff --git a/src/cpu_worker.rs b/src/cpu_worker.rs index a17888b7..d693f534 100644 --- a/src/cpu_worker.rs +++ b/src/cpu_worker.rs @@ -11,7 +11,7 @@ use { ptr_ext::MutPtrExt, queue::AsyncQueue, stack::Stack, }, }, - parking_lot::Mutex, + parking_lot::{Condvar, Mutex}, std::{ any::Any, cell::{Cell, RefCell}, @@ -113,18 +113,25 @@ enum Job { unsafe impl Send for Job {} +#[derive(Default)] +struct CompletedJobsExchange { + queue: VecDeque<CpuJobId>, + condvar: Option<Arc<Condvar>>, +} + struct CpuWorkerData { next: CpuJobIds, jobs_to_enqueue: AsyncQueue<Job>, new_jobs: Arc<Mutex<VecDeque<Job>>>, have_new_jobs: Rc<OwnedFd>, - completed_jobs_remote: Arc<Mutex<VecDeque<CpuJobId>>>, + completed_jobs_remote: Arc<Mutex<CompletedJobsExchange>>, completed_jobs_local: RefCell<VecDeque<CpuJobId>>, have_completed_jobs: Rc<OwnedFd>, pending_jobs: CopyHashMap<CpuJobId, Rc<PendingJobData>>, ring: Rc<IoUring>, _stop: OwnedFd, pending_job_data_cache: Stack<Rc<PendingJobData>>, + sync_wake_condvar: Arc<Condvar>, } linear_ids!(CpuJobIds, CpuJobId, u64); @@ -172,12 +179,16 @@ impl Drop for PendingJob { self.job_data.state.set(PendingJobState::Abandoned); data.jobs_to_enqueue.push(Job::Cancel { id }); data.do_equeue_jobs(); - let mut buf = 0u64; - while data.pending_jobs.contains(&id) { - if let Err(e) = uapi::read(data.have_completed_jobs.raw(), &mut buf) { - panic!("Could not wait for job completions: {}", ErrorFmt(e)); - } + loop { data.dispatch_completions(); + if !data.pending_jobs.contains(&id) { + break; + } + let mut remote = data.completed_jobs_remote.lock(); + while remote.queue.is_empty() { + remote.condvar = Some(data.sync_wake_condvar.clone()); + data.sync_wake_condvar.wait(&mut remote); + } } } PendingJobState::Abandoned => {} @@ -204,7 +215,7 @@ impl CpuWorkerData { fn dispatch_completions(&self) { let completions = &mut *self.completed_jobs_local.borrow_mut(); - mem::swap(completions, &mut *self.completed_jobs_remote.lock()); + mem::swap(completions, &mut self.completed_jobs_remote.lock().queue); while let Some(id) = completions.pop_front() { let job_data = self.pending_jobs.remove(&id).unwrap(); let job = job_data.job.take().unwrap(); @@ -242,7 +253,7 @@ impl CpuWorkerData { impl CpuWorker { pub fn new(ring: &Rc<IoUring>, eng: &Rc<AsyncEngine>) -> Result<Self, CpuWorkerError> { let new_jobs: Arc<Mutex<VecDeque<Job>>> = Default::default(); - let completed_jobs: Arc<Mutex<VecDeque<CpuJobId>>> = Default::default(); + let completed_jobs: Arc<Mutex<CompletedJobsExchange>> = Default::default(); let (stop_read, stop_write) = uapi::pipe2(c::O_CLOEXEC).map_err(|e| CpuWorkerError::Pipe(e.into()))?; let have_new_jobs = @@ -281,6 +292,7 @@ impl CpuWorker { ring: ring.clone(), _stop: stop_read, pending_job_data_cache: Default::default(), + sync_wake_condvar: Arc::new(Condvar::new()), }); Ok(Self { _completions_listener: eng.spawn( @@ -313,7 +325,7 @@ impl CpuWorker { fn work( new_jobs: Arc<Mutex<VecDeque<Job>>>, - completed_jobs: Arc<Mutex<VecDeque<CpuJobId>>>, + completed_jobs: Arc<Mutex<CompletedJobsExchange>>, stop: OwnedFd, have_new_jobs: OwnedFd, have_completed_jobs: OwnedFd, @@ -343,7 +355,7 @@ fn work( struct Worker { eng: Rc<AsyncEngine>, ring: Rc<IoUring>, - completed_jobs: Arc<Mutex<VecDeque<CpuJobId>>>, + completed_jobs: Arc<Mutex<CompletedJobsExchange>>, have_completed_jobs: OwnedFd, async_jobs: CopyHashMap<CpuJobId, AsyncJob>, stopped: Cell<bool>, @@ -428,7 +440,14 @@ impl Worker { } fn send_completion(&self, id: CpuJobId) { - self.completed_jobs.lock().push_back(id); + let cv = { + let mut exchange = self.completed_jobs.lock(); + exchange.queue.push_back(id); + exchange.condvar.take() + }; + if let Some(cv) = cv { + cv.notify_all(); + } if let Err(e) = uapi::eventfd_write(self.have_completed_jobs.raw(), 1) { panic!("Could not signal job completion: {}", ErrorFmt(e)); }