Skip to content

Commit

Permalink
Fix potentially panicing unchecked duration adds in runtime (#1489)
Browse files Browse the repository at this point in the history
* Fix potentially panicing unchecked duration adds in runtime

Fixes #1488

Signed-off-by: clux <[email protected]>

* use far_future as a default for instant overflow

Signed-off-by: clux <[email protected]>

---------

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux authored May 10, 2024
1 parent 22cb4a4 commit 3170f68
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
4 changes: 3 additions & 1 deletion kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,9 @@ where
obj_ref,
reason: reschedule_reason,
},
run_at: reconciler_finished_at + requeue_after,
run_at: reconciler_finished_at
.checked_add(requeue_after)
.unwrap_or_else(crate::scheduler::far_future),
}),
result: Some(result),
}
Expand Down
20 changes: 15 additions & 5 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,19 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
// Message is already pending, so we can't even expedite it
return;
}
let next_time = request
.run_at
.checked_add(*self.debounce)
.unwrap_or_else(far_future);
match self.scheduled.entry(request.message) {
// If new request is supposed to be earlier than the current entry's scheduled
// time (for eg: the new request is user triggered and the current entry is the
// reconciler's usual retry), then give priority to the new request.
Entry::Occupied(mut old_entry) if old_entry.get().run_at >= request.run_at => {
// Old entry will run after the new request, so replace it..
let entry = old_entry.get_mut();
self.queue
.reset_at(&entry.queue_key, request.run_at + *self.debounce);
entry.run_at = request.run_at + *self.debounce;
self.queue.reset_at(&entry.queue_key, next_time);
entry.run_at = next_time;
old_entry.replace_key();
}
Entry::Occupied(_old_entry) => {
Expand All @@ -93,8 +96,8 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
// No old entry, we're free to go!
let message = entry.key().clone();
entry.insert(ScheduledEntry {
run_at: request.run_at + *self.debounce,
queue_key: self.queue.insert_at(message, request.run_at + *self.debounce),
run_at: next_time,
queue_key: self.queue.insert_at(message, next_time),
});
}
}
Expand Down Expand Up @@ -280,6 +283,13 @@ pub fn debounced_scheduler<T: Eq + Hash + Clone, S: Stream<Item = ScheduleReques
Scheduler::new(requests, debounce)
}

// internal fallback for overflows in schedule times
pub(crate) fn far_future() -> Instant {
// private method from tokio for convenience - remove if upstream becomes pub
// https://github.com/tokio-rs/tokio/blob/6fcd9c02176bf3cd570bc7de88edaa3b95ea480a/tokio/src/time/instant.rs#L57-L63
Instant::now() + Duration::from_secs(86400 * 365 * 30)
}

#[cfg(test)]
mod tests {
use crate::utils::KubeRuntimeStreamExt;
Expand Down

0 comments on commit 3170f68

Please sign in to comment.