diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 0684f4251..5c36b38e7 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -472,7 +472,9 @@ where obj_ref, reason: reschedule_reason, }, - run_at: reconciler_finished_at + requeue_after, + run_at: reconciler_finished_at + .checked_add(requeue_after) + .unwrap_or_else(crate::scheduler::far_future), }), result: Some(result), } diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index d3dbb2c8d..195b7a4ec 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -74,6 +74,10 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { // Message is already pending, so we can't even expedite it return; } + let next_time = request + .run_at + .checked_add(*self.debounce) + .unwrap_or_else(far_future); match self.scheduled.entry(request.message) { // If new request is supposed to be earlier than the current entry's scheduled // time (for eg: the new request is user triggered and the current entry is the @@ -81,9 +85,8 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { Entry::Occupied(mut old_entry) if old_entry.get().run_at >= request.run_at => { // Old entry will run after the new request, so replace it.. let entry = old_entry.get_mut(); - self.queue - .reset_at(&entry.queue_key, request.run_at + *self.debounce); - entry.run_at = request.run_at + *self.debounce; + self.queue.reset_at(&entry.queue_key, next_time); + entry.run_at = next_time; old_entry.replace_key(); } Entry::Occupied(_old_entry) => { @@ -93,8 +96,8 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { // No old entry, we're free to go! let message = entry.key().clone(); entry.insert(ScheduledEntry { - run_at: request.run_at + *self.debounce, - queue_key: self.queue.insert_at(message, request.run_at + *self.debounce), + run_at: next_time, + queue_key: self.queue.insert_at(message, next_time), }); } } @@ -280,6 +283,13 @@ pub fn debounced_scheduler Instant { + // private method from tokio for convenience - remove if upstream becomes pub + // https://github.com/tokio-rs/tokio/blob/6fcd9c02176bf3cd570bc7de88edaa3b95ea480a/tokio/src/time/instant.rs#L57-L63 + Instant::now() + Duration::from_secs(86400 * 365 * 30) +} + #[cfg(test)] mod tests { use crate::utils::KubeRuntimeStreamExt;