Skip to content

Commit

Permalink
Refactoring: node state writes go through the same accessor method. (#97
Browse files Browse the repository at this point in the history
)

This includes both self updates and apply_delta
  • Loading branch information
fulmicoton authored Nov 16, 2023
1 parent 66aab76 commit 8514956
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 31 deletions.
12 changes: 6 additions & 6 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ impl Chitchat {
fn report_heartbeats(&mut self, delta: &Delta) {
for (chitchat_id, node_delta) in &delta.node_deltas {
if let Some(node_state) = self.cluster_state.node_states.get(chitchat_id) {
if node_state.heartbeat < node_delta.heartbeat
|| node_state.max_version < node_delta.max_version
if node_state.heartbeat() < node_delta.heartbeat
|| node_state.max_version() < node_delta.max_version
{
self.failure_detector.report_heartbeat(chitchat_id);
}
Expand All @@ -184,7 +184,7 @@ impl Chitchat {
let node_state = self
.node_state(chitchat_id)
.expect("Node state should exist.");
(chitchat_id.clone(), node_state.max_version)
(chitchat_id.clone(), node_state.max_version())
})
.collect::<HashMap<_, _>>();

Expand Down Expand Up @@ -307,9 +307,9 @@ mod tests {
}

fn assert_cluster_state_eq(lhs: &NodeState, rhs: &NodeState) {
assert_eq!(lhs.key_values.len(), rhs.key_values.len());
for (key, value) in &lhs.key_values {
assert_eq!(rhs.key_values.get(key), Some(value));
assert_eq!(lhs.num_key_values(), rhs.num_key_values());
for (key, value) in lhs.key_values() {
assert_eq!(rhs.get_versioned(key), Some(value));
}
}

Expand Down
52 changes: 29 additions & 23 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use crate::{ChitchatId, Heartbeat, MaxVersion, Version, VersionedValue};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeState {
pub(crate) heartbeat: Heartbeat,
pub(crate) key_values: BTreeMap<String, VersionedValue>,
pub(crate) max_version: MaxVersion,
heartbeat: Heartbeat,
key_values: BTreeMap<String, VersionedValue>,
max_version: MaxVersion,
#[serde(skip)]
#[serde(default = "Instant::now")]
last_heartbeat: Instant,
Expand All @@ -38,7 +38,7 @@ impl Default for NodeState {

impl NodeState {
/// Returns the node's last heartbeat value.
pub fn hearbeat(&self) -> Heartbeat {
pub fn heartbeat(&self) -> Heartbeat {
self.heartbeat
}

Expand Down Expand Up @@ -131,7 +131,6 @@ impl NodeState {
/// Removes the keys marked for deletion such that `tombstone + grace_period > heartbeat`.
fn gc_keys_marked_for_deletion(&mut self, grace_period: u64) {
let heartbeat = self.heartbeat.0;

self.key_values
.retain(|_, versioned_value: &mut VersionedValue| {
versioned_value
Expand All @@ -154,14 +153,34 @@ impl NodeState {
.filter(move |(_key, versioned_value)| versioned_value.version > floor_version)
}

/// Sets a new versioned value to associate to a given key.
/// This operation is ignored if the key value inserted has a version that is obsolete.
///
/// This method also update the max_version if necessary.
fn set_versioned_value(&mut self, key: String, versioned_value_update: VersionedValue) {
self.max_version = versioned_value_update.version.max(self.max_version);
match self.key_values.entry(key) {
Entry::Occupied(mut occupied) => {
let occupied_versioned_value = occupied.get_mut();
// The current version is more recent than the newer version.
if occupied_versioned_value.version >= versioned_value_update.version {
return;
}
*occupied_versioned_value = versioned_value_update;
}
Entry::Vacant(vacant) => {
vacant.insert(versioned_value_update);
}
};
}

fn set_with_version(&mut self, key: String, value: String, version: Version) {
assert!(version > self.max_version);
self.max_version = version;
self.key_values.insert(
self.set_versioned_value(
key,
VersionedValue {
version,
value,
version,
tombstone: None,
},
);
Expand Down Expand Up @@ -227,22 +246,9 @@ impl ClusterState {
node_state.last_heartbeat = Instant::now();
}
node_state.max_version = node_state.max_version.max(node_delta.max_version);

for (key, versioned_value) in node_delta.key_values {
let entry = node_state.key_values.entry(key);
match entry {
Entry::Occupied(mut occupied_entry) => {
let local_versioned_value = occupied_entry.get_mut();
// Only update the value if the new version is higher. It is possible that
// we have already received a fresher version from another node.
if local_versioned_value.version < versioned_value.version {
*local_versioned_value = versioned_value;
}
}
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(versioned_value);
}
}
node_state.max_version = node_state.max_version.max(versioned_value.version);
node_state.set_versioned_value(key, versioned_value);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions chitchat/tests/cluster_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Simulator {
}

pub async fn execute(&mut self, operations: Vec<Operation>) {
for operation in operations.into_iter() {
for operation in operations {
debug!("Execute operation {operation:?}");
match operation {
Operation::AddNode {
Expand Down Expand Up @@ -184,7 +184,7 @@ impl Simulator {
let chitchat = self.node_handles.get(&chitchat_id).unwrap().chitchat();
let mut chitchat_guard = chitchat.lock().await;
chitchat_guard.self_node_state().mark_for_deletion(&key);
let hearbeat = chitchat_guard.self_node_state().hearbeat();
let hearbeat = chitchat_guard.self_node_state().heartbeat();
debug!(node_id=%chitchat_id.node_id, key=%key, hearbeat=?hearbeat, "Marked key for deletion.");
}

Expand Down

0 comments on commit 8514956

Please sign in to comment.