diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 01513d0..7a8a60b 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -159,8 +159,8 @@ impl Chitchat { fn report_heartbeats(&mut self, delta: &Delta) { for (chitchat_id, node_delta) in &delta.node_deltas { if let Some(node_state) = self.cluster_state.node_states.get(chitchat_id) { - if node_state.heartbeat < node_delta.heartbeat - || node_state.max_version < node_delta.max_version + if node_state.heartbeat() < node_delta.heartbeat + || node_state.max_version() < node_delta.max_version { self.failure_detector.report_heartbeat(chitchat_id); } @@ -184,7 +184,7 @@ impl Chitchat { let node_state = self .node_state(chitchat_id) .expect("Node state should exist."); - (chitchat_id.clone(), node_state.max_version) + (chitchat_id.clone(), node_state.max_version()) }) .collect::>(); @@ -307,9 +307,9 @@ mod tests { } fn assert_cluster_state_eq(lhs: &NodeState, rhs: &NodeState) { - assert_eq!(lhs.key_values.len(), rhs.key_values.len()); - for (key, value) in &lhs.key_values { - assert_eq!(rhs.key_values.get(key), Some(value)); + assert_eq!(lhs.num_key_values(), rhs.num_key_values()); + for (key, value) in lhs.key_values() { + assert_eq!(rhs.get_versioned(key), Some(value)); } } diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index c0c5ffd..0d0420c 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -17,9 +17,9 @@ use crate::{ChitchatId, Heartbeat, MaxVersion, Version, VersionedValue}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeState { - pub(crate) heartbeat: Heartbeat, - pub(crate) key_values: BTreeMap, - pub(crate) max_version: MaxVersion, + heartbeat: Heartbeat, + key_values: BTreeMap, + max_version: MaxVersion, #[serde(skip)] #[serde(default = "Instant::now")] last_heartbeat: Instant, @@ -38,7 +38,7 @@ impl Default for NodeState { impl NodeState { /// Returns the node's last heartbeat value. - pub fn hearbeat(&self) -> Heartbeat { + pub fn heartbeat(&self) -> Heartbeat { self.heartbeat } @@ -131,7 +131,6 @@ impl NodeState { /// Removes the keys marked for deletion such that `tombstone + grace_period > heartbeat`. fn gc_keys_marked_for_deletion(&mut self, grace_period: u64) { let heartbeat = self.heartbeat.0; - self.key_values .retain(|_, versioned_value: &mut VersionedValue| { versioned_value @@ -154,17 +153,33 @@ impl NodeState { .filter(move |(_key, versioned_value)| versioned_value.version > floor_version) } + /// Ignores the key value insert if the version is obsolete. + fn set_versioned_value(&mut self, key: String, versioned_value_update: VersionedValue) { + self.max_version = versioned_value_update.version.max(self.max_version); + match self.key_values.entry(key) { + Entry::Occupied(mut occupied) => { + let occupied_versioned_value = occupied.get_mut(); + // The current version is more recent than the newer version. + if occupied_versioned_value.version >= versioned_value_update.version { + return; + } + *occupied_versioned_value = versioned_value_update; + } + Entry::Vacant(vacant) => { + vacant.insert(versioned_value_update); + } + }; + } + fn set_with_version(&mut self, key: String, value: String, version: Version) { - assert!(version > self.max_version); - self.max_version = version; - self.key_values.insert( + self.set_versioned_value( key, VersionedValue { - version, value, + version, tombstone: None, }, - ); + ) } } @@ -227,22 +242,9 @@ impl ClusterState { node_state.last_heartbeat = Instant::now(); } node_state.max_version = node_state.max_version.max(node_delta.max_version); - for (key, versioned_value) in node_delta.key_values { - let entry = node_state.key_values.entry(key); - match entry { - Entry::Occupied(mut occupied_entry) => { - let local_versioned_value = occupied_entry.get_mut(); - // Only update the value if the new version is higher. It is possible that - // we have already received a fresher version from another node. - if local_versioned_value.version < versioned_value.version { - *local_versioned_value = versioned_value; - } - } - Entry::Vacant(vacant_entry) => { - vacant_entry.insert(versioned_value); - } - } + node_state.max_version = node_state.max_version.max(versioned_value.version); + node_state.set_versioned_value(key, versioned_value); } } } diff --git a/chitchat/tests/cluster_test.rs b/chitchat/tests/cluster_test.rs index dccd7ab..9329f59 100644 --- a/chitchat/tests/cluster_test.rs +++ b/chitchat/tests/cluster_test.rs @@ -82,8 +82,8 @@ impl Simulator { } pub async fn execute(&mut self, operations: Vec) { - for operation in operations.into_iter() { - debug!("Execute operation {operation:?}"); + for operation in operations { + info!("Execute operation {operation:?}"); match operation { Operation::AddNode { chitchat_id, @@ -184,7 +184,7 @@ impl Simulator { let chitchat = self.node_handles.get(&chitchat_id).unwrap().chitchat(); let mut chitchat_guard = chitchat.lock().await; chitchat_guard.self_node_state().mark_for_deletion(&key); - let hearbeat = chitchat_guard.self_node_state().hearbeat(); + let hearbeat = chitchat_guard.self_node_state().heartbeat(); debug!(node_id=%chitchat_id.node_id, key=%key, hearbeat=?hearbeat, "Marked key for deletion."); }