From 1201021825e4665ff0792ae5eb83cb12b01a148c Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Fri, 16 Aug 2024 17:02:43 +0300 Subject: [PATCH 01/10] feat: moved leader computation to party, added ballot number to seed --- src/party.rs | 153 ++++++++++++++++++++++++++------------------------- 1 file changed, 78 insertions(+), 75 deletions(-) diff --git a/src/party.rs b/src/party.rs index 85f6b6f..71eab47 100644 --- a/src/party.rs +++ b/src/party.rs @@ -24,62 +24,16 @@ pub struct BPConConfig { /// Threshold weight to define BFT quorum: should be > 2/3 of total weight pub threshold: u128, - - /// Leader of the ballot, computed using seed obtained from config. - leader: u64, // TODO: define other config fields. } impl BPConConfig { /// Create new config instance. pub fn new(party_weights: Vec, threshold: u128) -> Self { - let mut cfg = Self { + Self { party_weights, threshold, - leader: 0, - }; - cfg.leader = cfg.compute_leader().unwrap(); - - cfg - } - - /// Compute leader in a weighed randomized manner. - /// Uses seed from the config, making it deterministic. - fn compute_leader(&self) -> Result { - let seed = self.compute_seed(); - - let total_weight: u64 = self.party_weights.iter().sum(); - if total_weight == 0 { - return Err(BallotError::LeaderElection("Zero weight sum".into())); - } - - // Use the seed from the config to create a deterministic random number generator. - let mut rng = StdRng::seed_from_u64(seed); - - let random_value: u64 = rng.gen_range(0..total_weight); - - let mut cumulative_weight = 0; - for (i, &weight) in self.party_weights.iter().enumerate() { - cumulative_weight += weight; - if random_value < cumulative_weight { - return Ok(i as u64); - } } - Err(BallotError::LeaderElection("Election failed".into())) - } - - /// Compute seed for randomized leader election. - fn compute_seed(&self) -> u64 { - let mut hasher = DefaultHasher::new(); - - // Hash each field that should contribute to the seed - self.party_weights.hash(&mut hasher); - self.threshold.hash(&mut hasher); - - // You can add more fields as needed - - // Generate the seed from the hash - hasher.finish() } } @@ -258,6 +212,46 @@ impl> Party { None } + /// Compute leader in a weighed randomized manner. + /// Uses seed from the config, making it deterministic. + pub fn get_leader(&self) -> Result { + let seed = self.compute_seed(); + + let total_weight: u64 = self.cfg.party_weights.iter().sum(); + if total_weight == 0 { + return Err(BallotError::LeaderElection("Zero weight sum".into())); + } + + // Use the seed from the config to create a deterministic random number generator. + let mut rng = StdRng::seed_from_u64(seed); + + let random_value: u64 = rng.gen_range(0..total_weight); + + let mut cumulative_weight = 0; + for (i, &weight) in self.cfg.party_weights.iter().enumerate() { + cumulative_weight += weight; + if random_value < cumulative_weight { + return Ok(i as u64); + } + } + Err(BallotError::LeaderElection("Election failed".into())) + } + + /// Compute seed for randomized leader election. + fn compute_seed(&self) -> u64 { + let mut hasher = DefaultHasher::new(); + + // Hash each field that should contribute to the seed + self.cfg.party_weights.hash(&mut hasher); + self.cfg.threshold.hash(&mut hasher); + self.ballot.hash(&mut hasher); + + // You can add more fields as needed + + // Generate the seed from the hash + hasher.finish() + } + fn get_value(&self) -> V { self.value_selector.select(&self.parties_voted_before) } @@ -356,7 +350,7 @@ impl> Party { )); } - if routing.sender != self.cfg.leader { + if routing.sender != self.get_leader()? { return Err(BallotError::InvalidState("Invalid leader in Msg1a".into())); } @@ -422,7 +416,7 @@ impl> Party { )); } - if routing.sender != self.cfg.leader { + if routing.sender != self.get_leader()? { return Err(BallotError::InvalidState("Invalid leader in Msg2a".into())); } @@ -526,7 +520,7 @@ impl> Party { "Cannot launch 1a, incorrect state".into(), )); } - if self.cfg.leader == self.id { + if self.get_leader()? == self.id { self.msg_out_sender .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message1aContent { @@ -576,7 +570,7 @@ impl> Party { "Cannot launch 2a, incorrect state".into(), )); } - if self.cfg.leader == self.id { + if self.get_leader()? == self.id { self.msg_out_sender .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message2aContent { @@ -672,16 +666,16 @@ mod tests { #[test] fn test_compute_leader_determinism() { - let party_weights = vec![1, 2, 7]; // Weighted case - let threshold = 7; // example threshold - - // Initialize the configuration once - let config = BPConConfig::new(party_weights.clone(), threshold); + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + }; + let party = Party::::new(0, cfg, MockValueSelector).0; // Compute the leader multiple times - let leader1 = config.compute_leader().unwrap(); - let leader2 = config.compute_leader().unwrap(); - let leader3 = config.compute_leader().unwrap(); + let leader1 = party.get_leader().unwrap(); + let leader2 = party.get_leader().unwrap(); + let leader3 = party.get_leader().unwrap(); // All leaders should be the same due to deterministic seed assert_eq!( @@ -695,13 +689,19 @@ mod tests { } #[test] - #[should_panic] fn test_compute_leader_zero_weights() { - let party_weights = vec![0, 0, 0]; - let threshold = 1; // example threshold + let cfg = BPConConfig { + party_weights: vec![0, 0, 0], + threshold: 4, + }; + let party = Party::::new(0, cfg, MockValueSelector).0; - // Create the config, which will attempt to compute the leader - BPConConfig::new(party_weights, threshold); + match party.get_leader() { + Err(BallotError::LeaderElection(_)) => { + // The test passes if the error is of type LeaderElection + } + _ => panic!("Expected BallotError::LeaderElection"), + } } #[test] @@ -709,15 +709,17 @@ mod tests { let cfg = BPConConfig { party_weights: vec![1, 2, 3], threshold: 4, - leader: 1, }; let mut party = Party::::new(0, cfg, MockValueSelector).0; party.status = PartyStatus::Launched; party.ballot = 1; + // Must send this message from leader of the ballot. + let leader = party.get_leader().unwrap(); + let msg = Message1aContent { ballot: 1 }; let routing = MessageRouting { - sender: 1, + sender: leader, receivers: vec![2, 3], is_broadcast: false, msg_type: ProtocolMessage::Msg1a, @@ -739,7 +741,6 @@ mod tests { let cfg = BPConConfig { party_weights: vec![1, 2, 3], // Total weight is 6 threshold: 4, // Threshold is 4 - leader: 1, }; let mut party = Party::::new(0, cfg, MockValueSelector).0; party.status = PartyStatus::Passed1a; @@ -798,18 +799,20 @@ mod tests { let cfg = BPConConfig { party_weights: vec![1, 2, 3], threshold: 4, - leader: 1, }; let mut party = Party::::new(0, cfg, MockValueSelector).0; party.status = PartyStatus::Passed1b; party.ballot = 1; + // Must send this message from leader of the ballot. + let leader = party.get_leader().unwrap(); + let msg = Message2aContent { ballot: 1, value: bincode::serialize(&MockValue(42)).unwrap(), }; let routing = MessageRouting { - sender: 1, + sender: leader, receivers: vec![0], is_broadcast: false, msg_type: ProtocolMessage::Msg2a, @@ -832,7 +835,6 @@ mod tests { let cfg = BPConConfig { party_weights: vec![1, 2, 3], threshold: 4, - leader: 1, }; let mut party = Party::::new(0, cfg, MockValueSelector).0; party.status = PartyStatus::Passed2a; @@ -890,7 +892,6 @@ mod tests { let cfg = BPConConfig { party_weights: vec![1, 2, 3], threshold: 4, - leader: 1, }; let mut party = Party::::new(0, cfg, MockValueSelector).0; party.status = PartyStatus::Passed2av; @@ -957,7 +958,6 @@ mod tests { let cfg = BPConConfig { party_weights: vec![1, 2, 3], threshold: 4, - leader: 0, }; let (mut party, _msg_out_receiver, _msg_in_sender) = Party::::new(0, cfg, MockValueSelector); @@ -978,7 +978,6 @@ mod tests { let cfg = BPConConfig { party_weights: vec![1, 2, 3], threshold: 4, - leader: 0, }; let (mut party, _, _) = Party::::new(0, cfg, MockValueSelector); @@ -1004,10 +1003,14 @@ mod tests { let cfg = BPConConfig { party_weights: vec![1, 2, 3], threshold: 4, - leader: 0, }; + + // This party id is precomputed for this specific party_weights, threshold and ballot. + // Because we need leader to send 1a. + let party_id = 2; + let (mut party, msg_out_receiver, _) = - Party::::new(0, cfg, MockValueSelector); + Party::::new(party_id, cfg, MockValueSelector); party.status = PartyStatus::Launched; party.ballot = 1; From 70027e30399b7bd7ba56fd8b7ff6cb6087585b3b Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Fri, 16 Aug 2024 18:58:10 +0300 Subject: [PATCH 02/10] feat: add configuration of timebounds for events, async --- Cargo.toml | 1 + src/party.rs | 288 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 186 insertions(+), 103 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4383852..9cb07a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,4 @@ log = "0.4.22" rkyv = { version = "0.7.44", features = ["validation"]} serde = { version = "1.0.207", features = ["derive"] } bincode = "1.3.3" +tokio = { version = "1.39.2", features = ["full", "test-util"] } diff --git a/src/party.rs b/src/party.rs index 71eab47..77c41e9 100644 --- a/src/party.rs +++ b/src/party.rs @@ -15,26 +15,35 @@ use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::Entry::Vacant; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::time::{self, Duration}; /// BPCon configuration. Includes ballot time bounds and other stuff. +#[derive(PartialEq, Eq, Debug, Clone)] pub struct BPConConfig { /// Parties weights: `party_weights[i]` corresponds to the i-th party weight pub party_weights: Vec, /// Threshold weight to define BFT quorum: should be > 2/3 of total weight pub threshold: u128, - // TODO: define other config fields. -} -impl BPConConfig { - /// Create new config instance. - pub fn new(party_weights: Vec, threshold: u128) -> Self { - Self { - party_weights, - threshold, - } - } + /// Timeout before 1a stage is launched. + pub launch1a_timeout: Duration, + + /// Timeout before 1b stage is launched. + pub launch1b_timeout: Duration, + + /// Timeout before 2a stage is launched. + pub launch2a_timeout: Duration, + + /// Timeout before 2av stage is launched. + pub launch2av_timeout: Duration, + + /// Timeout before 2b stage is launched. + pub launch2b_timeout: Duration, + + /// Timeout before finalization stage is launched. + pub finalize_timeout: Duration, } /// Party status defines the statuses of the ballot for the particular participant @@ -53,7 +62,7 @@ pub(crate) enum PartyStatus { } /// Party events is used for the ballot flow control. -#[derive(PartialEq)] +#[derive(PartialEq, Debug)] pub(crate) enum PartyEvent { Launch1a, Launch1b, @@ -64,6 +73,7 @@ pub(crate) enum PartyEvent { } /// A struct to keep track of senders and the cumulative weight of their messages. +#[derive(PartialEq, Debug)] struct MessageRoundState { senders: HashSet, weight: u128, @@ -112,12 +122,12 @@ pub struct Party> { pub id: u64, /// Communication queues. - msg_in_receiver: Receiver, - msg_out_sender: Sender, + msg_in_receiver: UnboundedReceiver, + msg_out_sender: UnboundedSender, /// Query to receive and send events that run ballot protocol - event_receiver: Receiver, - event_sender: Sender, + event_receiver: UnboundedReceiver, + event_sender: UnboundedSender, /// BPCon config (e.g. ballot time bounds, parties weights, etc.). cfg: BPConConfig, @@ -162,10 +172,14 @@ impl> Party { id: u64, cfg: BPConConfig, value_selector: VS, - ) -> (Self, Receiver, Sender) { - let (event_sender, event_receiver) = channel(); - let (msg_in_sender, msg_in_receiver) = channel(); - let (msg_out_sender, msg_out_receiver) = channel(); + ) -> ( + Self, + UnboundedReceiver, + UnboundedSender, + ) { + let (event_sender, event_receiver) = unbounded_channel(); + let (msg_in_sender, msg_in_receiver) = unbounded_channel(); + let (msg_out_sender, msg_out_receiver) = unbounded_channel(); ( Self { @@ -261,51 +275,88 @@ impl> Party { pub async fn launch_ballot(&mut self) -> Result, BallotError> { self.prepare_next_ballot(); - while self.is_launched() { - if let Ok(msg_wire) = self.msg_in_receiver.try_recv() { - if let Err(err) = self.update_state(msg_wire.content_bytes, msg_wire.routing) { - self.status = PartyStatus::Failed; - return Err(err); - } - } + self.status = PartyStatus::Launched; - if let Ok(event) = self.event_receiver.try_recv() { - if let Err(err) = self.follow_event(event) { - self.status = PartyStatus::Failed; - return Err(err); - } - } + let launch1a_timer = time::sleep(self.cfg.launch1a_timeout); + let launch1b_timer = time::sleep(self.cfg.launch1b_timeout); + let launch2a_timer = time::sleep(self.cfg.launch2a_timeout); + let launch2av_timer = time::sleep(self.cfg.launch2av_timeout); + let launch2b_timer = time::sleep(self.cfg.launch2a_timeout); + let finalize_timer = time::sleep(self.cfg.finalize_timeout); + + // Prevent the timers from firing immediately + tokio::pin!( + launch1a_timer, + launch1b_timer, + launch2a_timer, + launch2av_timer, + launch2b_timer, + finalize_timer + ); - // TODO: Emit events to run ballot protocol according to the ballot configuration - self.event_sender.send(PartyEvent::Launch1a).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch1a event".into()) - })?; - - self.event_sender.send(PartyEvent::Launch1b).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch1b event".into()) - })?; - - self.event_sender.send(PartyEvent::Launch2a).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch2a event".into()) - })?; - - self.event_sender.send(PartyEvent::Launch2av).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch2av event".into()) - })?; - - self.event_sender.send(PartyEvent::Launch2b).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch2b event".into()) - })?; - - self.event_sender.send(PartyEvent::Finalize).map_err(|_| { - self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Finalize event".into()) - })?; + while self.is_launched() { + tokio::select! { + _ = &mut launch1a_timer => { + self.event_sender.send(PartyEvent::Launch1a).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch1a event".into()) + })?; + }, + _ = &mut launch1b_timer => { + self.event_sender.send(PartyEvent::Launch1b).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch1b event".into()) + })?; + }, + _ = &mut launch2a_timer => { + self.event_sender.send(PartyEvent::Launch2a).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch2a event".into()) + })?; + }, + _ = &mut launch2av_timer => { + self.event_sender.send(PartyEvent::Launch2av).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch2av event".into()) + })?; + }, + _ = &mut launch2b_timer => { + self.event_sender.send(PartyEvent::Launch2b).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch2b event".into()) + })?; + }, + _ = &mut finalize_timer => { + self.event_sender.send(PartyEvent::Finalize).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Finalize event".into()) + })?; + }, + msg_wire = self.msg_in_receiver.recv() => { + if let Some(msg_wire) = msg_wire { + if let Err(err) = self.update_state(msg_wire.content_bytes, msg_wire.routing) { + self.status = PartyStatus::Failed; + return Err(err); + } + } else { + // Handle the case where the channel has been closed + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("Message channel closed".into())); + } + }, + event = self.event_receiver.recv() => { + if let Some(event) = event { + if let Err(err) = self.follow_event(event) { + self.status = PartyStatus::Failed; + return Err(err); + } + } else { + // Handle the case where the channel has been closed + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("Event channel closed".into())); + } + }, + } } Ok(self.get_value_selected()) @@ -664,12 +715,22 @@ mod tests { } } - #[test] - fn test_compute_leader_determinism() { - let cfg = BPConConfig { + fn default_config() -> BPConConfig { + BPConConfig { party_weights: vec![1, 2, 3], threshold: 4, - }; + launch1a_timeout: Duration::from_secs(0), + launch1b_timeout: Duration::from_secs(10), + launch2a_timeout: Duration::from_secs(20), + launch2av_timeout: Duration::from_secs(30), + launch2b_timeout: Duration::from_secs(40), + finalize_timeout: Duration::from_secs(50), + } + } + + #[test] + fn test_compute_leader_determinism() { + let cfg = default_config(); let party = Party::::new(0, cfg, MockValueSelector).0; // Compute the leader multiple times @@ -690,10 +751,9 @@ mod tests { #[test] fn test_compute_leader_zero_weights() { - let cfg = BPConConfig { - party_weights: vec![0, 0, 0], - threshold: 4, - }; + let mut cfg = default_config(); + cfg.party_weights = vec![0, 0, 0]; + let party = Party::::new(0, cfg, MockValueSelector).0; match party.get_leader() { @@ -706,10 +766,7 @@ mod tests { #[test] fn test_update_state_msg1a() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - }; + let cfg = default_config(); let mut party = Party::::new(0, cfg, MockValueSelector).0; party.status = PartyStatus::Launched; party.ballot = 1; @@ -738,10 +795,7 @@ mod tests { #[test] fn test_update_state_msg1b() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], // Total weight is 6 - threshold: 4, // Threshold is 4 - }; + let cfg = default_config(); let mut party = Party::::new(0, cfg, MockValueSelector).0; party.status = PartyStatus::Passed1a; party.ballot = 1; @@ -796,10 +850,7 @@ mod tests { #[test] fn test_update_state_msg2a() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - }; + let cfg = default_config(); let mut party = Party::::new(0, cfg, MockValueSelector).0; party.status = PartyStatus::Passed1b; party.ballot = 1; @@ -832,10 +883,7 @@ mod tests { #[test] fn test_update_state_msg2av() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - }; + let cfg = default_config(); let mut party = Party::::new(0, cfg, MockValueSelector).0; party.status = PartyStatus::Passed2a; party.ballot = 1; @@ -889,10 +937,7 @@ mod tests { #[test] fn test_update_state_msg2b() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - }; + let cfg = default_config(); let mut party = Party::::new(0, cfg, MockValueSelector).0; party.status = PartyStatus::Passed2av; party.ballot = 1; @@ -955,10 +1000,7 @@ mod tests { #[test] fn test_follow_event_launch1a() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - }; + let cfg = default_config(); let (mut party, _msg_out_receiver, _msg_in_sender) = Party::::new(0, cfg, MockValueSelector); @@ -975,10 +1017,7 @@ mod tests { #[test] fn test_ballot_reset_after_failure() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - }; + let cfg = default_config(); let (mut party, _, _) = Party::::new(0, cfg, MockValueSelector); @@ -1000,10 +1039,7 @@ mod tests { #[test] fn test_follow_event_communication_failure() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - }; + let cfg = default_config(); // This party id is precomputed for this specific party_weights, threshold and ballot. // Because we need leader to send 1a. @@ -1029,4 +1065,50 @@ mod tests { _ => panic!("Expected BallotError::Communication, got {:?}", result), } } + + #[tokio::test] + #[ignore] // This test is unfinished, turning off temporarily. + async fn test_launch_ballot_timeouts() { + // Pause the Tokio time so we can manipulate it + time::pause(); + + // Set up the Party with necessary configuration + let cfg = default_config(); + + let mut party = + Party::::new(0, cfg.clone(), MockValueSelector).0; + + // Create the event and message channels to substitute for testing. + let (event_sender, mut event_receiver) = unbounded_channel(); + + party.event_sender = event_sender; + // Note that we don't change party.event_receiver + + // Spawn the launch_ballot function in a separate task + let ballot_task = tokio::spawn(async move { + party.launch_ballot().await.unwrap(); + }); + + // Fast-forward time and check that the correct event is sent after each interval + time::advance(cfg.launch1a_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1a); + + time::advance(cfg.launch1b_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1b); + + time::advance(cfg.launch2a_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2a); + + time::advance(cfg.launch2av_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2av); + + time::advance(cfg.launch2b_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2b); + + time::advance(cfg.finalize_timeout).await; + assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Finalize); + + // Ensure that the task completes successfully + ballot_task.await.unwrap(); + } } From 55dc0b4705c4e2272dc67cdacd899fa7353cf61d Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Tue, 20 Aug 2024 15:58:37 +0300 Subject: [PATCH 03/10] feat: use hash-based leader election instead of rand --- Cargo.toml | 1 - src/party.rs | 19 +++++++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9cb07a3..2fc08a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -rand = "0.9.0-alpha.2" log = "0.4.22" rkyv = { version = "0.7.44", features = ["validation"]} serde = { version = "1.0.207", features = ["derive"] } diff --git a/src/party.rs b/src/party.rs index 77c41e9..ba5b16a 100644 --- a/src/party.rs +++ b/src/party.rs @@ -6,9 +6,6 @@ use crate::message::{ MessagePacket, MessageRouting, ProtocolMessage, }; use crate::{Value, ValueSelector}; -use rand::prelude::StdRng; -use rand::prelude::*; -use rand::Rng; use rkyv::{AlignedVec, Deserialize, Infallible}; use std::cmp::PartialEq; use std::collections::hash_map::DefaultHasher; @@ -236,15 +233,10 @@ impl> Party { return Err(BallotError::LeaderElection("Zero weight sum".into())); } - // Use the seed from the config to create a deterministic random number generator. - let mut rng = StdRng::seed_from_u64(seed); - - let random_value: u64 = rng.gen_range(0..total_weight); - let mut cumulative_weight = 0; for (i, &weight) in self.cfg.party_weights.iter().enumerate() { cumulative_weight += weight; - if random_value < cumulative_weight { + if self.hash_to_range(seed, cumulative_weight) < weight { return Ok(i as u64); } } @@ -266,6 +258,13 @@ impl> Party { hasher.finish() } + /// Hash the seed to a value within a given range. + fn hash_to_range(&self, seed: u64, range: u64) -> u64 { + let mut hasher = DefaultHasher::new(); + seed.hash(&mut hasher); + hasher.finish() % range + } + fn get_value(&self) -> V { self.value_selector.select(&self.parties_voted_before) } @@ -1043,7 +1042,7 @@ mod tests { // This party id is precomputed for this specific party_weights, threshold and ballot. // Because we need leader to send 1a. - let party_id = 2; + let party_id = 0; let (mut party, msg_out_receiver, _) = Party::::new(party_id, cfg, MockValueSelector); From 2ebd22cb76d4e0f605ec66394711ed7c37401754 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Wed, 21 Aug 2024 20:57:42 +0300 Subject: [PATCH 04/10] fix: resolved events sending --- src/party.rs | 76 +++++++++++++++++++++++++++------------------------- 1 file changed, 40 insertions(+), 36 deletions(-) diff --git a/src/party.rs b/src/party.rs index ba5b16a..f3b595e 100644 --- a/src/party.rs +++ b/src/party.rs @@ -269,21 +269,17 @@ impl> Party { self.value_selector.select(&self.parties_voted_before) } - /// Start the next ballot. It's expected from the external system to re-run ballot protocol in - /// case of failed ballot. pub async fn launch_ballot(&mut self) -> Result, BallotError> { self.prepare_next_ballot(); - self.status = PartyStatus::Launched; let launch1a_timer = time::sleep(self.cfg.launch1a_timeout); let launch1b_timer = time::sleep(self.cfg.launch1b_timeout); let launch2a_timer = time::sleep(self.cfg.launch2a_timeout); let launch2av_timer = time::sleep(self.cfg.launch2av_timeout); - let launch2b_timer = time::sleep(self.cfg.launch2a_timeout); + let launch2b_timer = time::sleep(self.cfg.launch2b_timeout); let finalize_timer = time::sleep(self.cfg.finalize_timeout); - // Prevent the timers from firing immediately tokio::pin!( launch1a_timer, launch1b_timer, @@ -293,43 +289,56 @@ impl> Party { finalize_timer ); + let mut launch1a_fired = false; + let mut launch1b_fired = false; + let mut launch2a_fired = false; + let mut launch2av_fired = false; + let mut launch2b_fired = false; + let mut finalize_fired = false; + while self.is_launched() { tokio::select! { - _ = &mut launch1a_timer => { + _ = &mut launch1a_timer, if !launch1a_fired => { self.event_sender.send(PartyEvent::Launch1a).map_err(|_| { self.status = PartyStatus::Failed; BallotError::Communication("Failed to send Launch1a event".into()) })?; + launch1a_fired = true; }, - _ = &mut launch1b_timer => { + _ = &mut launch1b_timer, if !launch1b_fired => { self.event_sender.send(PartyEvent::Launch1b).map_err(|_| { self.status = PartyStatus::Failed; BallotError::Communication("Failed to send Launch1b event".into()) })?; + launch1b_fired = true; }, - _ = &mut launch2a_timer => { + _ = &mut launch2a_timer, if !launch2a_fired => { self.event_sender.send(PartyEvent::Launch2a).map_err(|_| { self.status = PartyStatus::Failed; BallotError::Communication("Failed to send Launch2a event".into()) })?; + launch2a_fired = true; }, - _ = &mut launch2av_timer => { + _ = &mut launch2av_timer, if !launch2av_fired => { self.event_sender.send(PartyEvent::Launch2av).map_err(|_| { self.status = PartyStatus::Failed; BallotError::Communication("Failed to send Launch2av event".into()) })?; + launch2av_fired = true; }, - _ = &mut launch2b_timer => { + _ = &mut launch2b_timer, if !launch2b_fired => { self.event_sender.send(PartyEvent::Launch2b).map_err(|_| { self.status = PartyStatus::Failed; BallotError::Communication("Failed to send Launch2b event".into()) })?; + launch2b_fired = true; }, - _ = &mut finalize_timer => { + _ = &mut finalize_timer, if !finalize_fired => { self.event_sender.send(PartyEvent::Finalize).map_err(|_| { self.status = PartyStatus::Failed; BallotError::Communication("Failed to send Finalize event".into()) })?; + finalize_fired = true; }, msg_wire = self.msg_in_receiver.recv() => { if let Some(msg_wire) = msg_wire { @@ -337,10 +346,9 @@ impl> Party { self.status = PartyStatus::Failed; return Err(err); } - } else { - // Handle the case where the channel has been closed - self.status = PartyStatus::Failed; - return Err(BallotError::Communication("Message channel closed".into())); + }else if self.msg_in_receiver.is_closed(){ + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("msg-in channel closed".into())); } }, event = self.event_receiver.recv() => { @@ -349,10 +357,9 @@ impl> Party { self.status = PartyStatus::Failed; return Err(err); } - } else { - // Handle the case where the channel has been closed + }else if self.event_receiver.is_closed(){ self.status = PartyStatus::Failed; - return Err(BallotError::Communication("Event channel closed".into())); + return Err(BallotError::Communication("event receiver channel closed".into())); } }, } @@ -1066,48 +1073,45 @@ mod tests { } #[tokio::test] - #[ignore] // This test is unfinished, turning off temporarily. - async fn test_launch_ballot_timeouts() { + async fn test_launch_ballot_events() { // Pause the Tokio time so we can manipulate it time::pause(); // Set up the Party with necessary configuration let cfg = default_config(); - - let mut party = - Party::::new(0, cfg.clone(), MockValueSelector).0; - - // Create the event and message channels to substitute for testing. let (event_sender, mut event_receiver) = unbounded_channel(); + // Need to return all 3 values, so that they don't get dropped + // and associated channels don't get closed. + let (mut party, _msg_out_receiver, _msg_in_sender) = + Party::::new(0, cfg.clone(), MockValueSelector); + + // Same here, we would like to not lose party's event_receiver, so that test doesn't fail. + let _event_sender = party.event_sender; party.event_sender = event_sender; - // Note that we don't change party.event_receiver // Spawn the launch_ballot function in a separate task - let ballot_task = tokio::spawn(async move { + let _ballot_task = tokio::spawn(async move { party.launch_ballot().await.unwrap(); }); - // Fast-forward time and check that the correct event is sent after each interval + // Sequential time advance and event check time::advance(cfg.launch1a_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1a); - time::advance(cfg.launch1b_timeout).await; + time::advance(cfg.launch1b_timeout - cfg.launch1a_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1b); - time::advance(cfg.launch2a_timeout).await; + time::advance(cfg.launch2a_timeout - cfg.launch1b_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2a); - time::advance(cfg.launch2av_timeout).await; + time::advance(cfg.launch2av_timeout - cfg.launch2a_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2av); - time::advance(cfg.launch2b_timeout).await; + time::advance(cfg.launch2b_timeout - cfg.launch2av_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch2b); - time::advance(cfg.finalize_timeout).await; + time::advance(cfg.finalize_timeout - cfg.launch2b_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Finalize); - - // Ensure that the task completes successfully - ballot_task.await.unwrap(); } } From 2fcddbf0e8db82e889b1db36516da78f2c38b747 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Wed, 21 Aug 2024 22:13:47 +0300 Subject: [PATCH 05/10] feat: added status checks to update_state --- src/party.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/party.rs b/src/party.rs index f3b595e..16a3e1f 100644 --- a/src/party.rs +++ b/src/party.rs @@ -391,6 +391,11 @@ impl> Party { fn update_state(&mut self, m: AlignedVec, routing: MessageRouting) -> Result<(), BallotError> { match routing.msg_type { ProtocolMessage::Msg1a => { + if self.status != PartyStatus::Launched { + return Err(BallotError::InvalidState( + "Received Msg1a message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -414,6 +419,11 @@ impl> Party { self.status = PartyStatus::Passed1a; } ProtocolMessage::Msg1b => { + if self.status != PartyStatus::Passed1a { + return Err(BallotError::InvalidState( + "Received Msg1b message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -457,6 +467,11 @@ impl> Party { } } ProtocolMessage::Msg2a => { + if self.status != PartyStatus::Passed1b { + return Err(BallotError::InvalidState( + "Received Msg2a message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -494,6 +509,11 @@ impl> Party { } } ProtocolMessage::Msg2av => { + if self.status != PartyStatus::Passed2a { + return Err(BallotError::InvalidState( + "Received Msg2av message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) @@ -535,6 +555,11 @@ impl> Party { } } ProtocolMessage::Msg2b => { + if self.status != PartyStatus::Passed2av { + return Err(BallotError::InvalidState( + "Received Msg2b message, while party status is not ".into(), + )); + } let archived = rkyv::check_archived_root::(&m[..]).map_err(|err| { BallotError::MessageParsing(format!("Validation error: {:?}", err)) From 890026c94a485a8984769397181c366c62a6eb99 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Wed, 21 Aug 2024 23:31:23 +0300 Subject: [PATCH 06/10] feat: add timeout handling for latency between parties --- src/party.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/party.rs b/src/party.rs index 16a3e1f..3dc36b4 100644 --- a/src/party.rs +++ b/src/party.rs @@ -41,6 +41,9 @@ pub struct BPConConfig { /// Timeout before finalization stage is launched. pub finalize_timeout: Duration, + + /// Timeout for a graceful period to help parties with latency. + pub grace_period: Duration, } /// Party status defines the statuses of the ballot for the particular participant @@ -341,6 +344,7 @@ impl> Party { finalize_fired = true; }, msg_wire = self.msg_in_receiver.recv() => { + tokio::time::sleep(self.cfg.grace_period).await; if let Some(msg_wire) = msg_wire { if let Err(err) = self.update_state(msg_wire.content_bytes, msg_wire.routing) { self.status = PartyStatus::Failed; @@ -352,6 +356,7 @@ impl> Party { } }, event = self.event_receiver.recv() => { + tokio::time::sleep(self.cfg.grace_period).await; if let Some(event) = event { if let Err(err) = self.follow_event(event) { self.status = PartyStatus::Failed; @@ -756,6 +761,7 @@ mod tests { launch2av_timeout: Duration::from_secs(30), launch2b_timeout: Duration::from_secs(40), finalize_timeout: Duration::from_secs(50), + grace_period: Duration::from_secs(1), } } From 0d0906ab3030fba86dd7caf9515afe981d85b41b Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Fri, 23 Aug 2024 14:16:21 +0300 Subject: [PATCH 07/10] feat: add opportunity to configure prior to launch timeout --- src/party.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/party.rs b/src/party.rs index 3dc36b4..0dcbfd1 100644 --- a/src/party.rs +++ b/src/party.rs @@ -24,6 +24,11 @@ pub struct BPConConfig { /// Threshold weight to define BFT quorum: should be > 2/3 of total weight pub threshold: u128, + /// Timeout before ballot is launched. + /// Differs from `launch1a_timeout` having another status and not listening + /// to external events and messages. + pub launch_timeout: Duration, + /// Timeout before 1a stage is launched. pub launch1a_timeout: Duration, @@ -274,6 +279,8 @@ impl> Party { pub async fn launch_ballot(&mut self) -> Result, BallotError> { self.prepare_next_ballot(); + time::sleep(self.cfg.launch_timeout).await; + self.status = PartyStatus::Launched; let launch1a_timer = time::sleep(self.cfg.launch1a_timeout); @@ -755,6 +762,7 @@ mod tests { BPConConfig { party_weights: vec![1, 2, 3], threshold: 4, + launch_timeout: Duration::from_secs(0), launch1a_timeout: Duration::from_secs(0), launch1b_timeout: Duration::from_secs(10), launch2a_timeout: Duration::from_secs(20), From bd7341ee16987f781363dd36f45c3a354c252ac3 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Sat, 24 Aug 2024 17:59:23 +0300 Subject: [PATCH 08/10] feat: changed logic for leader election --- src/party.rs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/party.rs b/src/party.rs index 0dcbfd1..e35caac 100644 --- a/src/party.rs +++ b/src/party.rs @@ -7,7 +7,7 @@ use crate::message::{ }; use crate::{Value, ValueSelector}; use rkyv::{AlignedVec, Deserialize, Infallible}; -use std::cmp::PartialEq; +use std::cmp::{Ordering, PartialEq}; use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::Entry::Vacant; use std::collections::{HashMap, HashSet}; @@ -241,14 +241,26 @@ impl> Party { return Err(BallotError::LeaderElection("Zero weight sum".into())); } - let mut cumulative_weight = 0; - for (i, &weight) in self.cfg.party_weights.iter().enumerate() { - cumulative_weight += weight; - if self.hash_to_range(seed, cumulative_weight) < weight { - return Ok(i as u64); + // Generate a random number in the range [0, total_weight) + let random_value = self.hash_to_range(seed, total_weight); + + // Use binary search to find the corresponding participant + let mut cumulative_weights = vec![0; self.cfg.party_weights.len()]; + cumulative_weights[0] = self.cfg.party_weights[0]; + + for i in 1..self.cfg.party_weights.len() { + cumulative_weights[i] = cumulative_weights[i - 1] + self.cfg.party_weights[i]; + } + + match cumulative_weights.binary_search_by(|&weight| { + if random_value < weight { + Ordering::Greater + } else { + Ordering::Less } + }) { + Ok(index) | Err(index) => Ok(index as u64), } - Err(BallotError::LeaderElection("Election failed".into())) } /// Compute seed for randomized leader election. From 19e7f63307bd378c38704f2862b57628cda8a2d7 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Mon, 26 Aug 2024 17:07:33 +0300 Subject: [PATCH 09/10] feat: refactor leader election components --- src/party.rs | 243 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 163 insertions(+), 80 deletions(-) diff --git a/src/party.rs b/src/party.rs index e35caac..bfd8a54 100644 --- a/src/party.rs +++ b/src/party.rs @@ -111,6 +111,72 @@ impl MessageRoundState { } } +/// Trait incorporating logic for leader election. +pub trait LeaderElector>: Send { + /// Get leader for current ballot. + /// Returns id of the elected party or error. + fn get_leader(&self, party: &Party) -> Result; +} + +pub struct DefaultLeaderElector {} + +impl DefaultLeaderElector { + /// Compute seed for randomized leader election. + fn compute_seed>(party: &Party) -> u64 { + let mut hasher = DefaultHasher::new(); + + // Hash each field that should contribute to the seed + party.cfg.party_weights.hash(&mut hasher); + party.cfg.threshold.hash(&mut hasher); + party.ballot.hash(&mut hasher); + + // You can add more fields as needed + + // Generate the seed from the hash + hasher.finish() + } + + /// Hash the seed to a value within a given range. + fn hash_to_range(seed: u64, range: u64) -> u64 { + let mut hasher = DefaultHasher::new(); + seed.hash(&mut hasher); + hasher.finish() % range + } +} + +impl> LeaderElector for DefaultLeaderElector { + /// Compute leader in a weighed randomized manner. + /// Uses seed from the config, making it deterministic. + fn get_leader(&self, party: &Party) -> Result { + let seed = DefaultLeaderElector::compute_seed(party); + + let total_weight: u64 = party.cfg.party_weights.iter().sum(); + if total_weight == 0 { + return Err(BallotError::LeaderElection("Zero weight sum".into())); + } + + // Generate a random number in the range [0, total_weight) + let random_value = DefaultLeaderElector::hash_to_range(seed, total_weight); + + // Use binary search to find the corresponding participant + let mut cumulative_weights = vec![0; party.cfg.party_weights.len()]; + cumulative_weights[0] = party.cfg.party_weights[0]; + + for i in 1..party.cfg.party_weights.len() { + cumulative_weights[i] = cumulative_weights[i - 1] + party.cfg.party_weights[i]; + } + + match cumulative_weights.binary_search_by(|&weight| { + if random_value < weight { + Ordering::Greater + } else { + Ordering::Less + } + }) { + Ok(index) | Err(index) => Ok(index as u64), + } + } +} /// Party of the BPCon protocol that executes ballot. /// /// The communication between party and external @@ -140,12 +206,18 @@ pub struct Party> { /// Main functional for value selection. value_selector: VS, + /// Main functional for leader election. + elector: Box>, + /// Status of the ballot execution status: PartyStatus, /// Current ballot number ballot: u64, + /// Current ballot leader + leader: u64, + /// Last ballot where party submitted 2b message last_ballot_voted: Option, @@ -177,6 +249,7 @@ impl> Party { id: u64, cfg: BPConConfig, value_selector: VS, + elector: Box>, ) -> ( Self, UnboundedReceiver, @@ -195,8 +268,10 @@ impl> Party { event_sender, cfg, value_selector, + elector, status: PartyStatus::None, ballot: 0, + leader: 0, last_ballot_voted: None, last_value_voted: None, parties_voted_before: HashMap::new(), @@ -231,66 +306,12 @@ impl> Party { None } - /// Compute leader in a weighed randomized manner. - /// Uses seed from the config, making it deterministic. - pub fn get_leader(&self) -> Result { - let seed = self.compute_seed(); - - let total_weight: u64 = self.cfg.party_weights.iter().sum(); - if total_weight == 0 { - return Err(BallotError::LeaderElection("Zero weight sum".into())); - } - - // Generate a random number in the range [0, total_weight) - let random_value = self.hash_to_range(seed, total_weight); - - // Use binary search to find the corresponding participant - let mut cumulative_weights = vec![0; self.cfg.party_weights.len()]; - cumulative_weights[0] = self.cfg.party_weights[0]; - - for i in 1..self.cfg.party_weights.len() { - cumulative_weights[i] = cumulative_weights[i - 1] + self.cfg.party_weights[i]; - } - - match cumulative_weights.binary_search_by(|&weight| { - if random_value < weight { - Ordering::Greater - } else { - Ordering::Less - } - }) { - Ok(index) | Err(index) => Ok(index as u64), - } - } - - /// Compute seed for randomized leader election. - fn compute_seed(&self) -> u64 { - let mut hasher = DefaultHasher::new(); - - // Hash each field that should contribute to the seed - self.cfg.party_weights.hash(&mut hasher); - self.cfg.threshold.hash(&mut hasher); - self.ballot.hash(&mut hasher); - - // You can add more fields as needed - - // Generate the seed from the hash - hasher.finish() - } - - /// Hash the seed to a value within a given range. - fn hash_to_range(&self, seed: u64, range: u64) -> u64 { - let mut hasher = DefaultHasher::new(); - seed.hash(&mut hasher); - hasher.finish() % range - } - fn get_value(&self) -> V { self.value_selector.select(&self.parties_voted_before) } pub async fn launch_ballot(&mut self) -> Result, BallotError> { - self.prepare_next_ballot(); + self.prepare_next_ballot()?; time::sleep(self.cfg.launch_timeout).await; self.status = PartyStatus::Launched; @@ -393,9 +414,10 @@ impl> Party { } /// Prepare state before running a ballot. - fn prepare_next_ballot(&mut self) { + fn prepare_next_ballot(&mut self) -> Result<(), BallotError> { self.status = PartyStatus::None; self.ballot += 1; + self.leader = self.elector.get_leader(self)?; // Clean state self.parties_voted_before = HashMap::new(); @@ -409,6 +431,7 @@ impl> Party { while self.msg_in_receiver.try_recv().is_ok() {} self.status = PartyStatus::Launched; + Ok(()) } /// Update party's state based on message type. @@ -436,7 +459,7 @@ impl> Party { )); } - if routing.sender != self.get_leader()? { + if routing.sender != self.leader { return Err(BallotError::InvalidState("Invalid leader in Msg1a".into())); } @@ -512,7 +535,7 @@ impl> Party { )); } - if routing.sender != self.get_leader()? { + if routing.sender != self.leader { return Err(BallotError::InvalidState("Invalid leader in Msg2a".into())); } @@ -626,7 +649,7 @@ impl> Party { "Cannot launch 1a, incorrect state".into(), )); } - if self.get_leader()? == self.id { + if self.leader == self.id { self.msg_out_sender .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message1aContent { @@ -676,7 +699,7 @@ impl> Party { "Cannot launch 2a, incorrect state".into(), )); } - if self.get_leader()? == self.id { + if self.leader == self.id { self.msg_out_sender .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message2aContent { @@ -788,12 +811,18 @@ mod tests { #[test] fn test_compute_leader_determinism() { let cfg = default_config(); - let party = Party::::new(0, cfg, MockValueSelector).0; + let party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; // Compute the leader multiple times - let leader1 = party.get_leader().unwrap(); - let leader2 = party.get_leader().unwrap(); - let leader3 = party.get_leader().unwrap(); + let leader1 = party.elector.get_leader(&party).unwrap(); + let leader2 = party.elector.get_leader(&party).unwrap(); + let leader3 = party.elector.get_leader(&party).unwrap(); // All leaders should be the same due to deterministic seed assert_eq!( @@ -811,9 +840,15 @@ mod tests { let mut cfg = default_config(); cfg.party_weights = vec![0, 0, 0]; - let party = Party::::new(0, cfg, MockValueSelector).0; + let party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; - match party.get_leader() { + match party.elector.get_leader(&party) { Err(BallotError::LeaderElection(_)) => { // The test passes if the error is of type LeaderElection } @@ -824,12 +859,18 @@ mod tests { #[test] fn test_update_state_msg1a() { let cfg = default_config(); - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Launched; party.ballot = 1; // Must send this message from leader of the ballot. - let leader = party.get_leader().unwrap(); + let leader = party.elector.get_leader(&party).unwrap(); let msg = Message1aContent { ballot: 1 }; let routing = MessageRouting { @@ -853,7 +894,13 @@ mod tests { #[test] fn test_update_state_msg1b() { let cfg = default_config(); - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Passed1a; party.ballot = 1; @@ -908,12 +955,18 @@ mod tests { #[test] fn test_update_state_msg2a() { let cfg = default_config(); - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Passed1b; party.ballot = 1; // Must send this message from leader of the ballot. - let leader = party.get_leader().unwrap(); + let leader = party.elector.get_leader(&party).unwrap(); let msg = Message2aContent { ballot: 1, @@ -941,7 +994,13 @@ mod tests { #[test] fn test_update_state_msg2av() { let cfg = default_config(); - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Passed2a; party.ballot = 1; party.value_2a = Some(MockValue(42)); @@ -995,7 +1054,13 @@ mod tests { #[test] fn test_update_state_msg2b() { let cfg = default_config(); - let mut party = Party::::new(0, cfg, MockValueSelector).0; + let mut party = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ) + .0; party.status = PartyStatus::Passed2av; party.ballot = 1; @@ -1059,7 +1124,12 @@ mod tests { fn test_follow_event_launch1a() { let cfg = default_config(); let (mut party, _msg_out_receiver, _msg_in_sender) = - Party::::new(0, cfg, MockValueSelector); + Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ); party.status = PartyStatus::Launched; party.ballot = 1; @@ -1075,13 +1145,17 @@ mod tests { #[test] fn test_ballot_reset_after_failure() { let cfg = default_config(); - let (mut party, _, _) = - Party::::new(0, cfg, MockValueSelector); + let (mut party, _, _) = Party::::new( + 0, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ); party.status = PartyStatus::Failed; party.ballot = 1; - party.prepare_next_ballot(); + party.prepare_next_ballot().unwrap(); // Check that state has been reset assert_eq!(party.status, PartyStatus::Launched); @@ -1102,8 +1176,12 @@ mod tests { // Because we need leader to send 1a. let party_id = 0; - let (mut party, msg_out_receiver, _) = - Party::::new(party_id, cfg, MockValueSelector); + let (mut party, msg_out_receiver, _) = Party::::new( + party_id, + cfg, + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ); party.status = PartyStatus::Launched; party.ballot = 1; @@ -1135,7 +1213,12 @@ mod tests { // Need to return all 3 values, so that they don't get dropped // and associated channels don't get closed. let (mut party, _msg_out_receiver, _msg_in_sender) = - Party::::new(0, cfg.clone(), MockValueSelector); + Party::::new( + 0, + cfg.clone(), + MockValueSelector, + Box::new(DefaultLeaderElector {}), + ); // Same here, we would like to not lose party's event_receiver, so that test doesn't fail. let _event_sender = party.event_sender; From f5b1111eacc81e6b088f11eae3148f6114795546 Mon Sep 17 00:00:00 2001 From: Oleg Fomenko <35123037+olegfomenko@users.noreply.github.com> Date: Thu, 29 Aug 2024 14:47:30 +0300 Subject: [PATCH 10/10] Rewriting the hash_to_range function (#8) * rewriting the hash_to_range function to achieve better uniform distribution. Now it uses seeded random based on ChaCha12Rng instead of DefaultHash as before. Also, it fixes distribution in range by executing selection several times to achieve uniform distribution when range != 2^k * Adding comments * fix: resolved linter issues/failing tests --------- Co-authored-by: Nikita Masych --- .github/workflows/rust.yml | 1 + Cargo.toml | 2 + src/party.rs | 109 ++++++++++++++++++++++++++++++++++--- 3 files changed, 105 insertions(+), 7 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f8d9382..4e6ca37 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -4,6 +4,7 @@ on: pull_request: branches: - main + - epic/* push: branches: - main diff --git a/Cargo.toml b/Cargo.toml index 2fc08a5..4d16495 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,5 @@ rkyv = { version = "0.7.44", features = ["validation"]} serde = { version = "1.0.207", features = ["derive"] } bincode = "1.3.3" tokio = { version = "1.39.2", features = ["full", "test-util"] } +rand = "0.9.0-alpha.2" +seeded-random = "0.6.0" \ No newline at end of file diff --git a/src/party.rs b/src/party.rs index bfd8a54..c32ddd1 100644 --- a/src/party.rs +++ b/src/party.rs @@ -7,6 +7,7 @@ use crate::message::{ }; use crate::{Value, ValueSelector}; use rkyv::{AlignedVec, Deserialize, Infallible}; +use seeded_random::{Random, Seed}; use std::cmp::{Ordering, PartialEq}; use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::Entry::Vacant; @@ -138,9 +139,29 @@ impl DefaultLeaderElector { /// Hash the seed to a value within a given range. fn hash_to_range(seed: u64, range: u64) -> u64 { - let mut hasher = DefaultHasher::new(); - seed.hash(&mut hasher); - hasher.finish() % range + // Select the `k` suck that value 2^k >= `range` and 2^k is the smallest. + let mut k = 64; + while 1u64 << (k - 1) >= range { + k -= 1; + } + + // The following algorithm selects a random u64 value using `ChaCha12Rng` + // and reduces the result to the k-bits such that 2^k >= `range` the closes power of to the `range`. + // After we check if the result lies in [0..`range`) or [`range`..2^k). + // In the first case result is an acceptable value generated uniformly. + // In the second case we repeat the process again with the incremented iterations counter. + // Ref: Practical Cryptography 1st Edition by Niels Ferguson, Bruce Schneier, paragraph 10.8 + let rng = Random::from_seed(Seed::unsafe_new(seed)); + loop { + let mut raw_res: u64 = rng.gen(); + raw_res >>= 64 - k; + + if raw_res < range { + return raw_res; + } + // Executing this loop does not require a large number of iterations. + // Check tests for more info + } } } @@ -772,7 +793,10 @@ impl> Party { mod tests { use super::*; + use rand::Rng; + use seeded_random::{Random, Seed}; use std::collections::HashMap; + use std::thread; // Mock implementation of Value #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -870,11 +894,11 @@ mod tests { party.ballot = 1; // Must send this message from leader of the ballot. - let leader = party.elector.get_leader(&party).unwrap(); + party.leader = 0; // this party's id let msg = Message1aContent { ballot: 1 }; let routing = MessageRouting { - sender: leader, + sender: 0, receivers: vec![2, 3], is_broadcast: false, msg_type: ProtocolMessage::Msg1a, @@ -966,14 +990,14 @@ mod tests { party.ballot = 1; // Must send this message from leader of the ballot. - let leader = party.elector.get_leader(&party).unwrap(); + party.leader = 0; // this party's id let msg = Message2aContent { ballot: 1, value: bincode::serialize(&MockValue(42)).unwrap(), }; let routing = MessageRouting { - sender: leader, + sender: 0, receivers: vec![0], is_broadcast: false, msg_type: ProtocolMessage::Msg2a, @@ -1248,4 +1272,75 @@ mod tests { time::advance(cfg.finalize_timeout - cfg.launch2b_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Finalize); } + + fn debug_hash_to_range_new(seed: u64, range: u64) -> u64 { + assert!(range > 1); + + let mut k = 64; + while 1u64 << (k - 1) >= range { + k -= 1; + } + + let rng = Random::from_seed(Seed::unsafe_new(seed)); + + let mut iteration = 1u64; + loop { + let mut raw_res: u64 = rng.gen(); + raw_res >>= 64 - k; + + if raw_res < range { + return raw_res; + } + + iteration += 1; + assert!(iteration <= 50) + } + } + + #[test] + #[ignore] // Ignoring since it takes a while to run + fn test_hash_range_random() { + // test the uniform distribution + + const N: usize = 37; + const M: i64 = 10000000; + + let mut cnt1: [i64; N] = [0; N]; + + for _ in 0..M { + let mut rng = rand::thread_rng(); + let seed: u64 = rng.random(); + + let res1 = debug_hash_to_range_new(seed, N as u64); + assert!(res1 < N as u64); + + cnt1[res1 as usize] += 1; + } + + println!("1: {:?}", cnt1); + + let mut avg1: i64 = 0; + + for item in cnt1.iter().take(N) { + avg1 += (M / (N as i64) - item).abs(); + } + + avg1 /= N as i64; + + println!("Avg 1: {}", avg1); + } + + #[test] + fn test_rng() { + let rng1 = Random::from_seed(Seed::unsafe_new(123456)); + let rng2 = Random::from_seed(Seed::unsafe_new(123456)); + + println!("{}", rng1.gen::()); + println!("{}", rng2.gen::()); + + thread::sleep(Duration::from_secs(2)); + + println!("{}", rng1.gen::()); + println!("{}", rng2.gen::()); + } }