From 1675d45dd9adca8167eff7f28d51c1ddade4d74c Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Thu, 12 Dec 2024 09:26:43 +0300 Subject: [PATCH 1/2] fix: handle only the latest waker --- packages/apalis-core/src/worker/mod.rs | 33 +++++++++++++++++++------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index bb07621..95b7004 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -302,7 +302,7 @@ impl Worker> { let ctx = Context { running: Arc::default(), task_count: Arc::default(), - wakers: Arc::default(), + waker: Arc::default(), shutdown: self.state.shutdown, event_handler: self.state.event_handler.clone(), is_ready: Arc::default(), @@ -400,7 +400,7 @@ impl Future for Runnable { #[derive(Clone, Default)] pub struct Context { task_count: Arc, - wakers: Arc>>, + waker: Arc>>, running: Arc, shutdown: Option, event_handler: EventHandler, @@ -469,9 +469,9 @@ impl Context { } pub(crate) fn wake(&self) { - if let Ok(mut wakers) = self.wakers.lock() { - for waker in wakers.drain(..) { - waker.wake(); + if let Ok(waker) = self.waker.lock() { + if let Some(waker) = &*waker { + waker.clone().wake(); } } } @@ -501,13 +501,26 @@ impl Context { } fn add_waker(&self, cx: &mut TaskCtx<'_>) { - if let Ok(mut wakers) = self.wakers.lock() { - if !wakers.iter().any(|w| w.will_wake(cx.waker())) { - wakers.push(cx.waker().clone()); + if let Ok(mut waker_guard) = self.waker.lock() { + if waker_guard + .as_ref() + .map_or(true, |stored_waker| !stored_waker.will_wake(cx.waker())) + { + *waker_guard = Some(cx.waker().clone()); } } } + /// Checks if the stored waker matches the current one. + fn has_recent_waker(&self, cx: &TaskCtx<'_>) -> bool { + if let Ok(waker_guard) = self.waker.lock() { + if let Some(stored_waker) = &*waker_guard { + return stored_waker.will_wake(cx.waker()); + } + } + false + } + /// Returns if the worker is ready to consume new tasks pub fn is_ready(&self) -> bool { self.is_ready.load(Ordering::Acquire) && !self.is_shutting_down() @@ -522,7 +535,9 @@ impl Future for Context { if self.is_shutting_down() && task_count == 0 { Poll::Ready(()) } else { - self.add_waker(cx); + if !self.has_recent_waker(cx) { + self.add_waker(cx); + } Poll::Pending } } From 8ee3dea66029fce381d7e67bd1c305c4d996ff09 Mon Sep 17 00:00:00 2001 From: geofmureithi Date: Thu, 12 Dec 2024 09:53:08 +0300 Subject: [PATCH 2/2] lint: clippy --- packages/apalis-core/src/worker/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/apalis-core/src/worker/mod.rs b/packages/apalis-core/src/worker/mod.rs index 95b7004..0e025dc 100644 --- a/packages/apalis-core/src/worker/mod.rs +++ b/packages/apalis-core/src/worker/mod.rs @@ -471,7 +471,7 @@ impl Context { pub(crate) fn wake(&self) { if let Ok(waker) = self.waker.lock() { if let Some(waker) = &*waker { - waker.clone().wake(); + waker.wake_by_ref(); } } }