Skip to content

Commit

Permalink
API change.
Browse files Browse the repository at this point in the history
The key_value taking a predicate is useless, as it simply relies on
Iterator::filter anyway.

Added a `.iter_prefix` method that relies on the BTreeMap ordering.
  • Loading branch information
fulmicoton committed Oct 27, 2023
1 parent c939d19 commit 0016762
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
2 changes: 1 addition & 1 deletion chitchat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "chitchat"
version = "0.5.0"
version = "0.6.0"
edition = "2021"
license = "MIT"
authors = ["Quickwit, Inc. <[email protected]>"]
Expand Down
4 changes: 2 additions & 2 deletions chitchat/src/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ mod tests {
let mut rng = rand::thread_rng();
let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default());

let intervals_choices = vec![1u64, 2];
let intervals_choices = [1u64, 2];
let chitchat_ids_choices = vec![
ChitchatId::for_local_test(10_001),
ChitchatId::for_local_test(10_002),
Expand Down Expand Up @@ -334,7 +334,7 @@ mod tests {
fn test_failure_detector_node_state_from_live_to_down_to_live() {
let mut rng = rand::thread_rng();
let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default());
let intervals_choices = vec![1u64, 2];
let intervals_choices = [1u64, 2];
let node_1 = ChitchatId::for_local_test(10_001);

for _ in 0..=2000 {
Expand Down
58 changes: 42 additions & 16 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashSet};
use std::net::SocketAddr;
use std::ops::Bound;
use std::time::Instant;

use itertools::Itertools;
Expand Down Expand Up @@ -48,11 +49,21 @@ impl NodeState {

/// Returns an iterator over the keys matching the given predicate, excluding keys marked for
/// deletion.
pub fn key_values(
&self,
predicate: impl Fn(&String, &VersionedValue) -> bool,
) -> impl Iterator<Item = (&str, &VersionedValue)> {
self.internal_iter_key_values(predicate)
pub fn key_values(&self) -> impl Iterator<Item = (&str, &VersionedValue)> {
self.internal_iter_key_values()
.filter(|&(_, versioned_value)| versioned_value.tombstone.is_none())
}

/// Returns key values matching a prefix
pub fn iter_prefix<'a>(
&'a self,
prefix: &'a str,
) -> impl Iterator<Item = (&'a str, &'a VersionedValue)> + 'a {
let range = (Bound::Included(prefix), Bound::Unbounded);
self.key_values
.range::<str, _>(range)
.take_while(move |(key, _)| key.starts_with(prefix))
.map(|(key, record)| (key.as_str(), record))
.filter(|&(_, versioned_value)| versioned_value.tombstone.is_none())
}

Expand Down Expand Up @@ -86,7 +97,10 @@ impl NodeState {
/// Marks key for deletion and sets the value to an empty string.
pub fn mark_for_deletion(&mut self, key: &str) {
let Some(versioned_value) = self.key_values.get_mut(key) else {
warn!("Key `{}` does not exist in the node's state and could not be marked for deletion.", key);
warn!(
"Key `{key}` does not exist in the node's state and could not be marked for \
deletion.",
);
return;
};
self.max_version += 1;
Expand All @@ -108,13 +122,9 @@ impl NodeState {

/// Returns an iterator over keys matching the given predicate.
/// Not public as it returns also keys marked for deletion.
fn internal_iter_key_values(
&self,
predicate: impl Fn(&String, &VersionedValue) -> bool,
) -> impl Iterator<Item = (&str, &VersionedValue)> {
fn internal_iter_key_values(&self) -> impl Iterator<Item = (&str, &VersionedValue)> {
self.key_values
.iter()
.filter(move |(key, versioned_value)| predicate(key, versioned_value))
.map(|(key, record)| (key.as_str(), record))
}

Expand All @@ -138,9 +148,8 @@ impl NodeState {
floor_version: u64,
) -> impl Iterator<Item = (&str, &VersionedValue)> {
// TODO optimize by checking the max version.
self.internal_iter_key_values(move |_key, versioned_value| {
versioned_value.version > floor_version
})
self.internal_iter_key_values()
.filter(move |(_key, versioned_value)| versioned_value.version > floor_version)
}

fn set_with_version(&mut self, key: String, value: String, version: Version) {
Expand Down Expand Up @@ -210,8 +219,7 @@ impl ClusterState {

// Apply delta.
for (chitchat_id, node_delta) in delta.node_deltas {
let mut node_state = self.node_states.entry(chitchat_id).or_default();

let node_state = self.node_states.entry(chitchat_id).or_default();
if node_state.heartbeat < node_delta.heartbeat {
node_state.heartbeat = node_delta.heartbeat;
node_state.last_heartbeat = Instant::now();
Expand Down Expand Up @@ -1055,4 +1063,22 @@ mod tests {
assert_eq!(delta, expected_delta);
}
}

#[test]
fn test_iter_prefix() {
let mut node_state = NodeState::default();
node_state.set("Europe", "");
node_state.set("Europe:", "");
node_state.set("Europe:UK", "");
node_state.set("Asia:Japan", "");
node_state.set("Europe:Italy", "");
node_state.set("Africa:Uganda", "");
node_state.set("Oceania", "");
node_state.mark_for_deletion("Europe:UK");
let node_states: Vec<&str> = node_state
.iter_prefix("Europe:")
.map(|(key, _v)| key)
.collect();
assert_eq!(node_states, &["Europe:", "Europe:Italy"]);
}
}

0 comments on commit 0016762

Please sign in to comment.