Skip to content

Commit

Permalink
Resetting interval window when a node starts failing. (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Feb 23, 2024
1 parent 0ec7a75 commit fa59b4c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 8 deletions.
38 changes: 34 additions & 4 deletions chitchat/src/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ impl FailureDetector {
if !self.dead_nodes.contains_key(chitchat_id) {
self.dead_nodes.insert(chitchat_id.clone(), Instant::now());
}
// Remove current sampling window so that when the node
// Remove all samples, so that when the node
// comes back online, we start with a fresh sampling window.
// TODO is this the right idea?
// self.node_samples.remove(chitchat_id);
if let Some(node_sample) = self.node_samples.get_mut(chitchat_id) {
node_sample.reset();
}
}
}

Expand All @@ -77,8 +78,9 @@ impl FailureDetector {
garbage_collected_nodes.push(chitchat_id.clone())
}
}
for chitchat_id in garbage_collected_nodes.iter() {
for chitchat_id in &garbage_collected_nodes {
self.dead_nodes.remove(chitchat_id);
self.node_samples.remove(chitchat_id);
}
garbage_collected_nodes
}
Expand Down Expand Up @@ -222,6 +224,11 @@ impl SamplingWindow {
self.last_heartbeat = Some(now);
}

/// Forget about all previous intervals.
pub fn reset(&mut self) {
self.intervals.clear();
}

/// Computes the sampling window's phi value.
/// Returns `None` if have not received two heartbeat yet.
pub fn phi(&self) -> Option<f64> {
Expand Down Expand Up @@ -279,6 +286,12 @@ impl BoundedArrayStats {
}
}

pub fn clear(&mut self) {
self.index = 0;
self.is_filled = false;
self.sum = 0f64;
}

fn len(&self) -> usize {
if self.is_filled {
return self.values.len();
Expand Down Expand Up @@ -514,6 +527,23 @@ mod tests {
tokio::time::advance(Duration::from_secs(2)).await;

assert_nearly_equal(sampling_window.phi().unwrap(), 2.0f64 / mean);

tokio::time::advance(Duration::from_secs(100)).await;
sampling_window.reset();

// To revive, a single sample is not sufficient.
sampling_window.report_heartbeat();
assert!(sampling_window.phi().is_none());

tokio::time::advance(Duration::from_secs(2)).await;
sampling_window.report_heartbeat();

tokio::time::advance(Duration::from_secs(4)).await;

// Now intervals window is: [2.0]. With additive smoothing we get:
let new_mean = (2.0 + 2.0 * 5.0) / (1.0f64 + 5.0f64);

assert_nearly_equal(sampling_window.phi().unwrap(), 4.0f64 / new_mean);
}

#[track_caller]
Expand Down
11 changes: 9 additions & 2 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,16 @@ impl ClusterState {
stale_nodes.insert(chitchat_id, node_state);
continue;
};
let should_reset = node_state.last_gc_version > node_digest.max_version;
// TODO: We have problem here. If after the delta we end up with a max version that is
// not high enough to bring us to `last_gc_version`, we might get reset again
// and again.
let should_reset =
node_state.last_gc_version > node_digest.max_version && node_digest.max_version > 0;
if should_reset {
warn!("Node to reset {chitchat_id:?}");
warn!(
"Node to reset {chitchat_id:?} last gc version: {} max version: {}",
node_state.last_gc_version, node_digest.max_version
);
nodes_to_reset.push(chitchat_id);
stale_nodes.insert(chitchat_id, node_state);
continue;
Expand Down
10 changes: 8 additions & 2 deletions chitchat/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::SocketAddr;
use std::{fmt::Debug, net::SocketAddr};

use serde::{Deserialize, Serialize};
use tokio::time::Instant;
Expand All @@ -14,7 +14,7 @@ use tokio::time::Instant;
/// leaves and rejoins the cluster. Backends such as Cassandra or Quickwit typically use the node's
/// startup time as the `generation_id`. Applications with stable state across restarts can use a
/// constant `generation_id`, for instance, `0`.
#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
#[derive(Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
pub struct ChitchatId {
/// An identifier unique across the cluster.
pub node_id: String,
Expand All @@ -24,6 +24,12 @@ pub struct ChitchatId {
pub gossip_advertise_addr: SocketAddr,
}

impl Debug for ChitchatId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}:{}:{}", self.node_id.as_str(), self.generation_id, self.gossip_advertise_addr)
}
}

impl ChitchatId {
pub fn new(node_id: String, generation_id: u64, gossip_advertise_addr: SocketAddr) -> Self {
Self {
Expand Down

0 comments on commit fa59b4c

Please sign in to comment.