From 167c9137b195684a61a1749369d3ad0820acf92d Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 12 Feb 2024 14:47:46 +0900 Subject: [PATCH] code simplification --- chitchat/Cargo.toml | 1 + chitchat/src/delta.rs | 98 +++++++++++++++---------------- chitchat/src/lib.rs | 21 +++---- chitchat/src/serialize.rs | 37 ++++++++++-- chitchat/src/server.rs | 2 - chitchat/src/state.rs | 32 +++++----- chitchat/src/transport/channel.rs | 3 +- chitchat/tests/cluster_test.rs | 23 ++++---- 8 files changed, 120 insertions(+), 97 deletions(-) diff --git a/chitchat/Cargo.toml b/chitchat/Cargo.toml index dc68655..f486241 100644 --- a/chitchat/Cargo.toml +++ b/chitchat/Cargo.toml @@ -31,6 +31,7 @@ zstd = "0.13" assert-json-diff = "2" mock_instant = "0.3" tracing-subscriber = "0.3" +proptest = "1.4" [features] testsuite = [] diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index 5399560..8260068 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -1,12 +1,16 @@ use std::collections::HashSet; use crate::serialize::*; -use crate::{ChitchatId, Heartbeat, MaxVersion, VersionedValue}; +use crate::{ChitchatId, Heartbeat, VersionedValue}; +/// A delta is the message we send to another node to update it. +/// +/// Its serialization is done by transforming it into a sequence of operations, +/// encoded one after the other in a compressed stream. #[derive(Debug, Default, Eq, PartialEq)] pub struct Delta { pub(crate) nodes_to_reset: Vec, - pub(crate) node_deltas: Vec<(ChitchatId, NodeDelta)>, + pub(crate) node_deltas: Vec, } impl Delta { @@ -15,21 +19,18 @@ impl Delta { .nodes_to_reset .iter() .map(|node_to_reset| DeltaOpRef::NodeToReset(node_to_reset)); - let node_deltas = self - .node_deltas - .iter() - .flat_map(|(chitchat_id, node_delta)| { - std::iter::once(DeltaOpRef::Node { - chitchat_id, - heartbeat: node_delta.heartbeat, - }) - .chain(node_delta.key_values.iter().map( - |(key, versioned_value)| DeltaOpRef::KeyValue { - key, - versioned_value, - }, - )) - }); + let node_deltas = self.node_deltas.iter().flat_map(|node_delta| { + std::iter::once(DeltaOpRef::Node { + chitchat_id: &node_delta.chitchat_id, + heartbeat: node_delta.heartbeat, + }) + .chain(node_delta.key_values.iter().map(|(key, versioned_value)| { + DeltaOpRef::KeyValue { + key, + versioned_value, + } + })) + }); nodes_to_reset_ops.chain(node_deltas) } } @@ -245,40 +246,36 @@ impl Delta { pub fn num_tuples(&self) -> usize { self.node_deltas .iter() - .map(|(_, node_delta)| node_delta.num_tuples()) + .map(|node_delta| node_delta.num_tuples()) .sum() } - pub fn add_node(&mut self, target_chitchat_id: ChitchatId, heartbeat: Heartbeat) { + pub fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) { assert!(self .node_deltas .iter() - .find(|(chitchat_id, node_delta)| { chitchat_id == &target_chitchat_id }) + .find(|node_delta| { &node_delta.chitchat_id == &chitchat_id }) .is_none()); - self.node_deltas.push(( - target_chitchat_id, - NodeDelta { - heartbeat, - ..Default::default() - }, - )); + self.node_deltas.push(NodeDelta { + chitchat_id, + heartbeat, + key_values: Vec::new(), + }); } pub fn add_kv( &mut self, - target_chitchat_id: &ChitchatId, + chitchat_id: &ChitchatId, key: &str, value: &str, version: crate::Version, tombstone: Option, ) { - let (_, node_delta) = self + let node_delta = self .node_deltas .iter_mut() - .find(|(chitchat_id, _)| chitchat_id == target_chitchat_id) + .find(|node_delta| &node_delta.chitchat_id == chitchat_id) .unwrap(); - - node_delta.max_version = node_delta.max_version.max(version); node_delta.key_values.push(( key.to_string(), VersionedValue { @@ -289,11 +286,10 @@ impl Delta { )); } - pub(crate) fn get(&self, target_chitchat_id: &ChitchatId) -> Option<&NodeDelta> { + pub(crate) fn get(&self, chitchat_id: &ChitchatId) -> Option<&NodeDelta> { self.node_deltas .iter() - .find(|(chitchat_id, _)| chitchat_id == target_chitchat_id) - .map(|(_, node_deltas)| node_deltas) + .find(|node_delta| &node_delta.chitchat_id == chitchat_id) } pub fn add_node_to_reset(&mut self, chitchat_id: ChitchatId) { @@ -301,12 +297,11 @@ impl Delta { } } -#[derive(Debug, Default, Eq, PartialEq, serde::Serialize)] +#[derive(Debug, Eq, PartialEq, serde::Serialize)] pub(crate) struct NodeDelta { + pub chitchat_id: ChitchatId, pub heartbeat: Heartbeat, pub key_values: Vec<(String, VersionedValue)>, - // This attribute is computed upon deserialization. 0 if `key_values` is empty. - pub max_version: MaxVersion, } #[cfg(test)] @@ -320,8 +315,7 @@ impl NodeDelta { pub(crate) struct DeltaBuilder { existing_nodes: HashSet, delta: Delta, - current_chitchat_id: Option, - current_node_delta: NodeDelta, + current_node_delta: Option, } impl DeltaBuilder { @@ -336,22 +330,23 @@ impl DeltaBuilder { chitchat_id, heartbeat, } => { - anyhow::ensure!(self.current_chitchat_id.as_ref() != Some(&chitchat_id)); + self.flush(); anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id)); self.existing_nodes.insert(chitchat_id.clone()); - self.flush(); - self.current_chitchat_id = Some(chitchat_id); - self.current_node_delta.heartbeat = heartbeat; + self.current_node_delta = Some(NodeDelta { + chitchat_id, + heartbeat, + key_values: Vec::new(), + }); } DeltaOp::KeyValue { key, versioned_value, } => { - self.current_node_delta.max_version = self - .current_node_delta - .max_version - .max(versioned_value.version); - self.current_node_delta + let Some(current_node_delta) = self.current_node_delta.as_mut() else { + anyhow::bail!("received a key-value op without a node op before."); + }; + current_node_delta .key_values .push((key.to_string(), versioned_value)); } @@ -364,14 +359,13 @@ impl DeltaBuilder { } fn flush(&mut self) { - let Some(chitchat_id) = std::mem::take(&mut self.current_chitchat_id) else { + let Some(node_delta) = self.current_node_delta.take() else { // There are no nodes in the builder. // (this happens when the delta builder is freshly created and no ops have been received // yet.) return; }; - let node_delta = std::mem::take(&mut self.current_node_delta); - self.delta.node_deltas.push((chitchat_id, node_delta)); + self.delta.node_deltas.push(node_delta); } } diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index af4bbaf..4006cdd 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -112,7 +112,7 @@ impl Chitchat { let dead_nodes: HashSet<_> = self.dead_nodes().collect(); let self_digest = self.compute_digest(); let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1 - digest.serialized_len(); - let delta = self.cluster_state.compute_delta( + let delta = self.cluster_state.compute_partial_delta_respecting_mtu( &digest, delta_mtu, &dead_nodes, @@ -126,7 +126,7 @@ impl Chitchat { ChitchatMessage::SynAck { digest, delta } => { self.process_delta(delta); let dead_nodes = self.dead_nodes().collect::>(); - let delta = self.cluster_state.compute_delta( + let delta = self.cluster_state.compute_partial_delta_respecting_mtu( &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1, &dead_nodes, @@ -156,16 +156,17 @@ impl Chitchat { /// Reports heartbeats to the failure detector for nodes in the delta for which we received an /// update. fn report_heartbeats(&mut self, delta: &Delta) { - for (chitchat_id, node_delta) in &delta.node_deltas { - if let Some(node_state) = self.cluster_state.node_states.get(chitchat_id) { - if node_state.heartbeat() < node_delta.heartbeat - || node_state.max_version() < node_delta.max_version - { - self.failure_detector.report_heartbeat(chitchat_id); + for node_delta in &delta.node_deltas { + if let Some(node_state) = self.cluster_state.node_states.get(&node_delta.chitchat_id) { + if node_state.heartbeat() < node_delta.heartbeat { + self.failure_detector + .report_heartbeat(&node_delta.chitchat_id); } } else { - self.failure_detector.report_unknown(chitchat_id); - self.failure_detector.update_node_liveness(chitchat_id); + self.failure_detector + .report_unknown(&node_delta.chitchat_id); + self.failure_detector + .update_node_liveness(&node_delta.chitchat_id); } } } diff --git a/chitchat/src/serialize.rs b/chitchat/src/serialize.rs index 548777b..d225a15 100644 --- a/chitchat/src/serialize.rs +++ b/chitchat/src/serialize.rs @@ -329,11 +329,16 @@ impl Deserializable for Heartbeat { } } +/// A compressed stream writer receives a sequence of `Serializable` and +/// serialize/compresses into blocks of a configurable size. +/// +/// Block are tagged, so that blocks with a high entropy are stored kept "uncompressed". +/// +/// The stream gives the client an upperbound of what the overall payload length would +/// be if another item was appended. +/// This makes it possible to enforce a `mtu`. pub struct CompressedStreamWriter { output: Vec, - // number of blocks written in output. - num_blocks: u16, - // temporary buffer used for block compression. uncompressed_block: Vec, // ongoing block being serialized. @@ -350,7 +355,6 @@ impl CompressedStreamWriter { uncompressed_block: Vec::with_capacity(block_threshold * 2), compressed_block: Vec::with_capacity(block_threshold), block_threshold, - num_blocks: 0, } } @@ -364,6 +368,7 @@ impl CompressedStreamWriter { 1 // End of stream flag } + /// Appends a new item to the stream. Items must be at most `u16::MAX` bytes long. pub fn append(&mut self, item: &S) { let item_len = item.serialized_len(); assert!(item_len <= u16::MAX as usize); @@ -408,7 +413,6 @@ impl CompressedStreamWriter { self.output.extend(&self.uncompressed_block); } } - self.num_blocks += 1; self.uncompressed_block.clear(); self.compressed_block.clear(); } @@ -455,7 +459,7 @@ pub fn deserialize_stream(buf: &mut &[u8]) -> anyhow::Result< } }; } - anyhow::bail!("compressed streams error: reached end of buffer without NoMoreBlock tag"); + anyhow::bail!("compressed stream error: reached end of buffer without NoMoreBlock tag"); } #[derive(Eq, PartialEq, Debug)] @@ -530,6 +534,7 @@ pub fn test_serdeser_aux = deserialize_stream(&mut &buf[..]).unwrap(); + assert!(buf.len() <= len_upper_bound); + assert_eq!(vals.len(), payload.len()); + for (left, right) in vals.iter().zip(payload.iter()) { + assert_eq!(left, right); + } + } + } } diff --git a/chitchat/src/server.rs b/chitchat/src/server.rs index 481cbaa..4124607 100644 --- a/chitchat/src/server.rs +++ b/chitchat/src/server.rs @@ -414,9 +414,7 @@ mod tests { use tokio_stream::{Stream, StreamExt}; use super::*; - use crate::delta::Delta; use crate::message::ChitchatMessage; - use crate::serialize::Deserializable; use crate::transport::{ChannelTransport, Transport}; use crate::{Heartbeat, NodeState, MAX_UDP_DATAGRAM_PAYLOAD_SIZE}; diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index d42bb66..cdd9d35 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tracing::warn; -use crate::delta::{Delta, DeltaWriter}; +use crate::delta::{Delta, DeltaWriter, NodeDelta}; use crate::digest::{Digest, NodeDigest}; use crate::listener::Listeners; use crate::{ChitchatId, Heartbeat, KeyChangeEvent, MaxVersion, Version, VersionedValue}; @@ -296,17 +296,21 @@ impl ClusterState { .retain(|chitchat_id, _| !delta.nodes_to_reset.contains(chitchat_id)); // Apply delta. - for (chitchat_id, node_delta) in delta.node_deltas { + for node_delta in delta.node_deltas { + let NodeDelta { + chitchat_id, + heartbeat, + key_values, + } = node_delta; let node_state = self .node_states .entry(chitchat_id.clone()) .or_insert_with(|| NodeState::new(chitchat_id, self.listeners.clone())); - if node_state.heartbeat < node_delta.heartbeat { - node_state.heartbeat = node_delta.heartbeat; + if node_state.heartbeat < heartbeat { + node_state.heartbeat = heartbeat; node_state.last_heartbeat = Instant::now(); } - node_state.max_version = node_state.max_version.max(node_delta.max_version); - for (key, versioned_value) in node_delta.key_values { + for (key, versioned_value) in key_values { node_state.max_version = node_state.max_version.max(versioned_value.version); node_state.set_versioned_value(key, versioned_value); } @@ -337,7 +341,7 @@ impl ClusterState { } /// Implements the Scuttlebutt reconciliation with the scuttle-depth ordering. - pub fn compute_delta( + pub fn compute_partial_delta_respecting_mtu( &self, digest: &Digest, mtu: usize, @@ -535,7 +539,7 @@ fn random_generator() -> impl Rng { #[cfg(test)] mod tests { use super::*; - use crate::serialize::{Deserializable, Serializable}; + use crate::serialize::Serializable; use crate::MAX_UDP_DATAGRAM_PAYLOAD_SIZE; #[test] @@ -944,12 +948,12 @@ mod tests { dead_nodes: &HashSet<&ChitchatId>, expected_delta_atoms: &[(&ChitchatId, &str, &str, Version, Option)], ) { - let max_delta = cluster_state.compute_delta(digest, usize::MAX, dead_nodes, 10_000); + let max_delta = cluster_state.compute_partial_delta_respecting_mtu(digest, usize::MAX, dead_nodes, 10_000); let mut buf = Vec::new(); Serializable::serialize(&max_delta, &mut buf); let mut mtu_per_num_entries = Vec::new(); for mtu in 100..buf.len() { - let delta = cluster_state.compute_delta(digest, mtu, dead_nodes, 10_000); + let delta = cluster_state.compute_partial_delta_respecting_mtu(digest, mtu, dead_nodes, 10_000); let num_tuples = delta.num_tuples(); if mtu_per_num_entries.len() == num_tuples + 1 { continue; @@ -965,11 +969,11 @@ mod tests { expected_delta.add_kv(node, key, val, version, tombstone); } { - let delta = cluster_state.compute_delta(digest, mtu, dead_nodes, 10_000); + let delta = cluster_state.compute_partial_delta_respecting_mtu(digest, mtu, dead_nodes, 10_000); assert_eq!(&delta, &expected_delta); } { - let delta = cluster_state.compute_delta(digest, mtu + 1, dead_nodes, 10_000); + let delta = cluster_state.compute_partial_delta_respecting_mtu(digest, mtu + 1, dead_nodes, 10_000); assert_eq!(&delta, &expected_delta); } } @@ -1098,7 +1102,7 @@ mod tests { let node1 = ChitchatId::for_local_test(10_001); digest.add_node(node1.clone(), Heartbeat(0), 1); { - let delta = cluster_state.compute_delta( + let delta = cluster_state.compute_partial_delta_respecting_mtu( &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE, &HashSet::new(), @@ -1116,7 +1120,7 @@ mod tests { // Node 1 heartbeat in digest + grace period (9_999) is inferior to the // node1's hearbeat in the cluster state. Thus we expect the cluster to compute a // delta that will reset node 1. - let delta = cluster_state.compute_delta( + let delta = cluster_state.compute_partial_delta_respecting_mtu( &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE, &HashSet::new(), diff --git a/chitchat/src/transport/channel.rs b/chitchat/src/transport/channel.rs index 11ba515..137076c 100644 --- a/chitchat/src/transport/channel.rs +++ b/chitchat/src/transport/channel.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex}; use anyhow::{bail, Context}; use async_trait::async_trait; use tokio::sync::mpsc::{Receiver, Sender}; -use tracing::{debug, info}; +use tracing::info; use crate::serialize::Serializable; use crate::transport::{Socket, Transport}; @@ -98,7 +98,6 @@ impl ChannelTransport { bail!("Serialized message size exceeds MTU."); } } - debug!(num_bytes = num_bytes, "send"); let mut inner_lock = self.inner.lock().unwrap(); inner_lock.statistics.record_message_len(num_bytes); if let Some(to_addrs) = inner_lock.removed_links.get(&from_addr) { diff --git a/chitchat/tests/cluster_test.rs b/chitchat/tests/cluster_test.rs index 725973d..1d4a0cf 100644 --- a/chitchat/tests/cluster_test.rs +++ b/chitchat/tests/cluster_test.rs @@ -9,7 +9,7 @@ use chitchat::{ }; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; -use tracing::{debug, info}; +use tracing::{debug, error, info}; #[derive(Debug)] enum Operation { @@ -158,7 +158,7 @@ impl Simulator { } assert!(predicate_value); } else { - info!(node_id=%chitchat_id.node_id, state_snapshot=?chitchat_guard.state_snapshot(), "Node state missing."); + error!(node_id=%chitchat_id.node_id, state_snapshot=?chitchat_guard.state_snapshot(), "Node state missing."); panic!("Node state missing"); } } @@ -326,7 +326,8 @@ async fn test_simple_simulation_with_network_partition() { #[tokio::test] async fn test_marked_for_deletion_gc_with_network_partition() { - // let _ = tracing_subscriber::fmt::try_init(); + const TIMEOUT: Duration = Duration::from_millis(500); + let _ = tracing_subscriber::fmt::try_init(); let mut simulator = Simulator::new(Duration::from_millis(100), 10); let chitchat_id_1 = create_chitchat_id("node-1"); let chitchat_id_2 = create_chitchat_id("node-2"); @@ -358,13 +359,13 @@ async fn test_marked_for_deletion_gc_with_network_partition() { server_chitchat_id: chitchat_id_2.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true), - timeout_opt: Some(Duration::from_millis(300)), + timeout_opt: Some(TIMEOUT), }, Operation::NodeStateAssert { server_chitchat_id: chitchat_id_3.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true), - timeout_opt: Some(Duration::from_millis(300)), + timeout_opt: Some(TIMEOUT), }, // Isolate node 3. Operation::RemoveNetworkLink(chitchat_id_1.clone(), chitchat_id_3.clone()), @@ -379,7 +380,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { server_chitchat_id: chitchat_id_2.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::MarkedForDeletion("key_a".to_string(), true), - timeout_opt: Some(Duration::from_millis(300)), + timeout_opt: Some(TIMEOUT), }, // Check marked for deletion is not propagated to node 3. Operation::NodeStateAssert { @@ -394,7 +395,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { server_chitchat_id: chitchat_id_2.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), - timeout_opt: Some(Duration::from_millis(300)), + timeout_opt: Some(TIMEOUT), }, Operation::NodeStateAssert { server_chitchat_id: chitchat_id_1.clone(), @@ -411,7 +412,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { }, // Wait for propagation // We need to wait longer... because node 4 is just starting? - Operation::Wait(Duration::from_millis(500)), + Operation::Wait(TIMEOUT), Operation::NodeStateAssert { server_chitchat_id: chitchat_id_3.clone(), chitchat_id: chitchat_id_1.clone(), @@ -422,7 +423,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { server_chitchat_id: chitchat_id_4.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true), - timeout_opt: None, + timeout_opt: Some(TIMEOUT), }, // Relink node 3 Operation::AddNetworkLink(chitchat_id_1.clone(), chitchat_id_3.clone()), @@ -431,13 +432,13 @@ async fn test_marked_for_deletion_gc_with_network_partition() { server_chitchat_id: chitchat_id_3.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), - timeout_opt: Some(Duration::from_millis(500)), + timeout_opt: Some(TIMEOUT), }, Operation::NodeStateAssert { server_chitchat_id: chitchat_id_4.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), - timeout_opt: Some(Duration::from_millis(500)), + timeout_opt: Some(TIMEOUT), }, ]; simulator.execute(operations).await;