diff --git a/chitchat/Cargo.toml b/chitchat/Cargo.toml index 315c188..8169c23 100644 --- a/chitchat/Cargo.toml +++ b/chitchat/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "chitchat" -version = "0.5.0" +version = "0.6.0" edition = "2021" license = "MIT" authors = ["Quickwit, Inc. "] diff --git a/chitchat/src/failure_detector.rs b/chitchat/src/failure_detector.rs index f61ec14..a41373b 100644 --- a/chitchat/src/failure_detector.rs +++ b/chitchat/src/failure_detector.rs @@ -264,7 +264,7 @@ mod tests { let mut rng = rand::thread_rng(); let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default()); - let intervals_choices = vec![1u64, 2]; + let intervals_choices = [1u64, 2]; let chitchat_ids_choices = vec![ ChitchatId::for_local_test(10_001), ChitchatId::for_local_test(10_002), @@ -334,7 +334,7 @@ mod tests { fn test_failure_detector_node_state_from_live_to_down_to_live() { let mut rng = rand::thread_rng(); let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default()); - let intervals_choices = vec![1u64, 2]; + let intervals_choices = [1u64, 2]; let node_1 = ChitchatId::for_local_test(10_001); for _ in 0..=2000 { diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index b3e9799..909faff 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -1,6 +1,7 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, HashSet}; use std::net::SocketAddr; +use std::ops::Bound; use std::time::Instant; use itertools::Itertools; @@ -48,11 +49,21 @@ impl NodeState { /// Returns an iterator over the keys matching the given predicate, excluding keys marked for /// deletion. - pub fn key_values( - &self, - predicate: impl Fn(&String, &VersionedValue) -> bool, - ) -> impl Iterator { - self.internal_iter_key_values(predicate) + pub fn key_values(&self) -> impl Iterator { + self.internal_iter_key_values() + .filter(|&(_, versioned_value)| versioned_value.tombstone.is_none()) + } + + /// Returns key values matching a prefix + pub fn iter_prefix<'a>( + &'a self, + prefix: &'a str, + ) -> impl Iterator + 'a { + let range = (Bound::Included(prefix), Bound::Unbounded); + self.key_values + .range::(range) + .take_while(move |(key, _)| key.starts_with(prefix)) + .map(|(key, record)| (key.as_str(), record)) .filter(|&(_, versioned_value)| versioned_value.tombstone.is_none()) } @@ -86,7 +97,10 @@ impl NodeState { /// Marks key for deletion and sets the value to an empty string. pub fn mark_for_deletion(&mut self, key: &str) { let Some(versioned_value) = self.key_values.get_mut(key) else { - warn!("Key `{}` does not exist in the node's state and could not be marked for deletion.", key); + warn!( + "Key `{key}` does not exist in the node's state and could not be marked for \ + deletion.", + ); return; }; self.max_version += 1; @@ -108,13 +122,9 @@ impl NodeState { /// Returns an iterator over keys matching the given predicate. /// Not public as it returns also keys marked for deletion. - fn internal_iter_key_values( - &self, - predicate: impl Fn(&String, &VersionedValue) -> bool, - ) -> impl Iterator { + fn internal_iter_key_values(&self) -> impl Iterator { self.key_values .iter() - .filter(move |(key, versioned_value)| predicate(key, versioned_value)) .map(|(key, record)| (key.as_str(), record)) } @@ -138,9 +148,8 @@ impl NodeState { floor_version: u64, ) -> impl Iterator { // TODO optimize by checking the max version. - self.internal_iter_key_values(move |_key, versioned_value| { - versioned_value.version > floor_version - }) + self.internal_iter_key_values() + .filter(move |(_key, versioned_value)| versioned_value.version > floor_version) } fn set_with_version(&mut self, key: String, value: String, version: Version) { @@ -210,8 +219,7 @@ impl ClusterState { // Apply delta. for (chitchat_id, node_delta) in delta.node_deltas { - let mut node_state = self.node_states.entry(chitchat_id).or_default(); - + let node_state = self.node_states.entry(chitchat_id).or_default(); if node_state.heartbeat < node_delta.heartbeat { node_state.heartbeat = node_delta.heartbeat; node_state.last_heartbeat = Instant::now(); @@ -1055,4 +1063,22 @@ mod tests { assert_eq!(delta, expected_delta); } } + + #[test] + fn test_iter_prefix() { + let mut node_state = NodeState::default(); + node_state.set("Europe", ""); + node_state.set("Europe:", ""); + node_state.set("Europe:UK", ""); + node_state.set("Asia:Japan", ""); + node_state.set("Europe:Italy", ""); + node_state.set("Africa:Uganda", ""); + node_state.set("Oceania", ""); + node_state.mark_for_deletion("Europe:UK"); + let node_states: Vec<&str> = node_state + .iter_prefix("Europe:") + .map(|(key, _v)| key) + .collect(); + assert_eq!(node_states, &["Europe:", "Europe:Italy"]); + } }