Skip to content

Commit

Permalink
code simplification
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Feb 12, 2024
1 parent bfc4bcf commit 167c913
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 97 deletions.
1 change: 1 addition & 0 deletions chitchat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ zstd = "0.13"
assert-json-diff = "2"
mock_instant = "0.3"
tracing-subscriber = "0.3"
proptest = "1.4"

[features]
testsuite = []
98 changes: 46 additions & 52 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::collections::HashSet;

use crate::serialize::*;
use crate::{ChitchatId, Heartbeat, MaxVersion, VersionedValue};
use crate::{ChitchatId, Heartbeat, VersionedValue};

/// A delta is the message we send to another node to update it.
///
/// Its serialization is done by transforming it into a sequence of operations,
/// encoded one after the other in a compressed stream.
#[derive(Debug, Default, Eq, PartialEq)]
pub struct Delta {
pub(crate) nodes_to_reset: Vec<ChitchatId>,
pub(crate) node_deltas: Vec<(ChitchatId, NodeDelta)>,
pub(crate) node_deltas: Vec<NodeDelta>,
}

impl Delta {
Expand All @@ -15,21 +19,18 @@ impl Delta {
.nodes_to_reset
.iter()
.map(|node_to_reset| DeltaOpRef::NodeToReset(node_to_reset));
let node_deltas = self
.node_deltas
.iter()
.flat_map(|(chitchat_id, node_delta)| {
std::iter::once(DeltaOpRef::Node {
chitchat_id,
heartbeat: node_delta.heartbeat,
})
.chain(node_delta.key_values.iter().map(
|(key, versioned_value)| DeltaOpRef::KeyValue {
key,
versioned_value,
},
))
});
let node_deltas = self.node_deltas.iter().flat_map(|node_delta| {
std::iter::once(DeltaOpRef::Node {
chitchat_id: &node_delta.chitchat_id,
heartbeat: node_delta.heartbeat,
})
.chain(node_delta.key_values.iter().map(|(key, versioned_value)| {
DeltaOpRef::KeyValue {
key,
versioned_value,
}
}))
});
nodes_to_reset_ops.chain(node_deltas)
}
}
Expand Down Expand Up @@ -245,40 +246,36 @@ impl Delta {
pub fn num_tuples(&self) -> usize {
self.node_deltas
.iter()
.map(|(_, node_delta)| node_delta.num_tuples())
.map(|node_delta| node_delta.num_tuples())
.sum()
}

pub fn add_node(&mut self, target_chitchat_id: ChitchatId, heartbeat: Heartbeat) {
pub fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) {
assert!(self
.node_deltas
.iter()
.find(|(chitchat_id, node_delta)| { chitchat_id == &target_chitchat_id })
.find(|node_delta| { &node_delta.chitchat_id == &chitchat_id })
.is_none());
self.node_deltas.push((
target_chitchat_id,
NodeDelta {
heartbeat,
..Default::default()
},
));
self.node_deltas.push(NodeDelta {
chitchat_id,
heartbeat,
key_values: Vec::new(),
});
}

pub fn add_kv(
&mut self,
target_chitchat_id: &ChitchatId,
chitchat_id: &ChitchatId,
key: &str,
value: &str,
version: crate::Version,
tombstone: Option<u64>,
) {
let (_, node_delta) = self
let node_delta = self
.node_deltas
.iter_mut()
.find(|(chitchat_id, _)| chitchat_id == target_chitchat_id)
.find(|node_delta| &node_delta.chitchat_id == chitchat_id)
.unwrap();

node_delta.max_version = node_delta.max_version.max(version);
node_delta.key_values.push((
key.to_string(),
VersionedValue {
Expand All @@ -289,24 +286,22 @@ impl Delta {
));
}

pub(crate) fn get(&self, target_chitchat_id: &ChitchatId) -> Option<&NodeDelta> {
pub(crate) fn get(&self, chitchat_id: &ChitchatId) -> Option<&NodeDelta> {
self.node_deltas
.iter()
.find(|(chitchat_id, _)| chitchat_id == target_chitchat_id)
.map(|(_, node_deltas)| node_deltas)
.find(|node_delta| &node_delta.chitchat_id == chitchat_id)
}

pub fn add_node_to_reset(&mut self, chitchat_id: ChitchatId) {
self.nodes_to_reset.push(chitchat_id);
}
}

#[derive(Debug, Default, Eq, PartialEq, serde::Serialize)]
#[derive(Debug, Eq, PartialEq, serde::Serialize)]
pub(crate) struct NodeDelta {
pub chitchat_id: ChitchatId,
pub heartbeat: Heartbeat,
pub key_values: Vec<(String, VersionedValue)>,
// This attribute is computed upon deserialization. 0 if `key_values` is empty.
pub max_version: MaxVersion,
}

#[cfg(test)]
Expand All @@ -320,8 +315,7 @@ impl NodeDelta {
pub(crate) struct DeltaBuilder {
existing_nodes: HashSet<ChitchatId>,
delta: Delta,
current_chitchat_id: Option<ChitchatId>,
current_node_delta: NodeDelta,
current_node_delta: Option<NodeDelta>,
}

impl DeltaBuilder {
Expand All @@ -336,22 +330,23 @@ impl DeltaBuilder {
chitchat_id,
heartbeat,
} => {
anyhow::ensure!(self.current_chitchat_id.as_ref() != Some(&chitchat_id));
self.flush();
anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id));
self.existing_nodes.insert(chitchat_id.clone());
self.flush();
self.current_chitchat_id = Some(chitchat_id);
self.current_node_delta.heartbeat = heartbeat;
self.current_node_delta = Some(NodeDelta {
chitchat_id,
heartbeat,
key_values: Vec::new(),
});
}
DeltaOp::KeyValue {
key,
versioned_value,
} => {
self.current_node_delta.max_version = self
.current_node_delta
.max_version
.max(versioned_value.version);
self.current_node_delta
let Some(current_node_delta) = self.current_node_delta.as_mut() else {
anyhow::bail!("received a key-value op without a node op before.");
};
current_node_delta
.key_values
.push((key.to_string(), versioned_value));
}
Expand All @@ -364,14 +359,13 @@ impl DeltaBuilder {
}

fn flush(&mut self) {
let Some(chitchat_id) = std::mem::take(&mut self.current_chitchat_id) else {
let Some(node_delta) = self.current_node_delta.take() else {
// There are no nodes in the builder.
// (this happens when the delta builder is freshly created and no ops have been received
// yet.)
return;
};
let node_delta = std::mem::take(&mut self.current_node_delta);
self.delta.node_deltas.push((chitchat_id, node_delta));
self.delta.node_deltas.push(node_delta);
}
}

Expand Down
21 changes: 11 additions & 10 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Chitchat {
let dead_nodes: HashSet<_> = self.dead_nodes().collect();
let self_digest = self.compute_digest();
let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1 - digest.serialized_len();
let delta = self.cluster_state.compute_delta(
let delta = self.cluster_state.compute_partial_delta_respecting_mtu(
&digest,
delta_mtu,
&dead_nodes,
Expand All @@ -126,7 +126,7 @@ impl Chitchat {
ChitchatMessage::SynAck { digest, delta } => {
self.process_delta(delta);
let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
let delta = self.cluster_state.compute_delta(
let delta = self.cluster_state.compute_partial_delta_respecting_mtu(
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1,
&dead_nodes,
Expand Down Expand Up @@ -156,16 +156,17 @@ impl Chitchat {
/// Reports heartbeats to the failure detector for nodes in the delta for which we received an
/// update.
fn report_heartbeats(&mut self, delta: &Delta) {
for (chitchat_id, node_delta) in &delta.node_deltas {
if let Some(node_state) = self.cluster_state.node_states.get(chitchat_id) {
if node_state.heartbeat() < node_delta.heartbeat
|| node_state.max_version() < node_delta.max_version
{
self.failure_detector.report_heartbeat(chitchat_id);
for node_delta in &delta.node_deltas {
if let Some(node_state) = self.cluster_state.node_states.get(&node_delta.chitchat_id) {
if node_state.heartbeat() < node_delta.heartbeat {
self.failure_detector
.report_heartbeat(&node_delta.chitchat_id);
}
} else {
self.failure_detector.report_unknown(chitchat_id);
self.failure_detector.update_node_liveness(chitchat_id);
self.failure_detector
.report_unknown(&node_delta.chitchat_id);
self.failure_detector
.update_node_liveness(&node_delta.chitchat_id);
}
}
}
Expand Down
37 changes: 31 additions & 6 deletions chitchat/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,16 @@ impl Deserializable for Heartbeat {
}
}

/// A compressed stream writer receives a sequence of `Serializable` and
/// serialize/compresses into blocks of a configurable size.
///
/// Block are tagged, so that blocks with a high entropy are stored kept "uncompressed".
///
/// The stream gives the client an upperbound of what the overall payload length would
/// be if another item was appended.
/// This makes it possible to enforce a `mtu`.
pub struct CompressedStreamWriter {
output: Vec<u8>,
// number of blocks written in output.
num_blocks: u16,

// temporary buffer used for block compression.
uncompressed_block: Vec<u8>,
// ongoing block being serialized.
Expand All @@ -350,7 +355,6 @@ impl CompressedStreamWriter {
uncompressed_block: Vec::with_capacity(block_threshold * 2),
compressed_block: Vec::with_capacity(block_threshold),
block_threshold,
num_blocks: 0,
}
}

Expand All @@ -364,6 +368,7 @@ impl CompressedStreamWriter {
1 // End of stream flag
}

/// Appends a new item to the stream. Items must be at most `u16::MAX` bytes long.
pub fn append<S: Serializable + ?Sized>(&mut self, item: &S) {
let item_len = item.serialized_len();
assert!(item_len <= u16::MAX as usize);
Expand Down Expand Up @@ -408,7 +413,6 @@ impl CompressedStreamWriter {
self.output.extend(&self.uncompressed_block);
}
}
self.num_blocks += 1;
self.uncompressed_block.clear();
self.compressed_block.clear();
}
Expand Down Expand Up @@ -455,7 +459,7 @@ pub fn deserialize_stream<D: Deserializable>(buf: &mut &[u8]) -> anyhow::Result<
}
};
}
anyhow::bail!("compressed streams error: reached end of buffer without NoMoreBlock tag");
anyhow::bail!("compressed stream error: reached end of buffer without NoMoreBlock tag");
}

#[derive(Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -530,6 +534,7 @@ pub fn test_serdeser_aux<T: Serializable + Deserializable + PartialEq + std::fmt

#[cfg(test)]
mod tests {
use proptest::proptest;
use rand::distributions::Alphanumeric;
use rand::Rng;

Expand Down Expand Up @@ -640,4 +645,24 @@ mod tests {
}
assert!(cursor.is_empty());
}

proptest! {
#[test]
fn test_proptest_compressed_stream(payload in proptest::collection::vec(".{0,1000}", 1..100)) {
let mut compressed_stream_writer: CompressedStreamWriter =
CompressedStreamWriter::with_block_threshold(1_000);
for s in &payload[..payload.len() - 1] {
compressed_stream_writer.append(s);
}
let len_upper_bound = compressed_stream_writer.serialized_len_upperbound_after(&payload[payload.len() - 1]);
compressed_stream_writer.append(&payload[payload.len() - 1]);
let buf = compressed_stream_writer.finish();
let vals: Vec<String> = deserialize_stream(&mut &buf[..]).unwrap();
assert!(buf.len() <= len_upper_bound);
assert_eq!(vals.len(), payload.len());
for (left, right) in vals.iter().zip(payload.iter()) {
assert_eq!(left, right);
}
}
}
}
2 changes: 0 additions & 2 deletions chitchat/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,7 @@ mod tests {
use tokio_stream::{Stream, StreamExt};

use super::*;
use crate::delta::Delta;
use crate::message::ChitchatMessage;
use crate::serialize::Deserializable;
use crate::transport::{ChannelTransport, Transport};
use crate::{Heartbeat, NodeState, MAX_UDP_DATAGRAM_PAYLOAD_SIZE};

Expand Down
Loading

0 comments on commit 167c913

Please sign in to comment.