diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 4e6ca37..8b7bea8 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -13,6 +13,9 @@ on: permissions: contents: read +env: + CARGO_TERM_COLOR: always + jobs: check: name: Check @@ -83,12 +86,14 @@ jobs: profile: minimal toolchain: ${{ matrix.rust }} - - name: Run cargo test - uses: actions-rs/cargo@v1 - continue-on-error: false + - name: Install Nextest + uses: taiki-e/install-action@v2 with: - command: test - args: --all-features --verbose + tool: nextest + + - name: Run Nextest + # We have non-deterministic integration tests, thus adding retries. + run: cargo nextest run --all-features --retries 3 lints: name: Lints @@ -133,4 +138,4 @@ jobs: continue-on-error: false with: command: clippy - args: --all-targets -- -D warnings + args: --all-targets --all-features -- -D warnings diff --git a/Cargo.toml b/Cargo.toml index e1f65ab..527aacf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" description = "BPCon: A Byzantine Fault-Tolerant Consensus Protocol Implementation in Rust." license = "MIT" repository = "https://github.com/distributed-lab/bpcon" -homepage = "https://github.com/distributed-lab/bpcon" +homepage = "https://github.com/distributed-lab/bpcon#readme" documentation = "https://distributed-lab.github.io/bpcon/" keywords = ["consensus", "byzantine", "protocol", "distributed-systems", "blockchain"] categories = ["algorithms"] @@ -23,8 +23,13 @@ seeded-random = "^0.6.0" thiserror = "^1.0.63" [features] -default = ["full"] -full = ["tokio/full", "rkyv/validation"] +default = ["tokio/full", "rkyv/validation"] +test-mocks = [] [dev-dependencies] tokio = { version = "^1.39.2", features = ["test-util"] } +futures = "0.3.30" + +[[test]] +name = "mod" +required-features = ["test-mocks"] \ No newline at end of file diff --git a/README.md b/README.md index b61b98c..e643aa8 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ bpcon = {version = "0.1.0", git = "https://github.com/distributed-lab/bpcon"} This is a core trait, which defines what type are you selecting in your consensus. It may be the next block in blockchain, or leader for some operation, or anything else you need. -Below is a simple example, where we will operate on selection for `u64` type. +Below is a simple example, where we will operate on selection for `u64` type. Using it you may interpret `ID` for leader of distributed operation, for instance. ```rust @@ -40,7 +40,7 @@ impl Value for MyValue {} ### Implement [ValueSelector](https://distributed-lab.github.io/bpcon/bpcon/value/trait.ValueSelector.html) trait -`BPCon` allows you to define specific conditions how proposer (leader) will select value +`BPCon` allows you to define specific conditions how proposer (leader) will select value and how other members will verify its selection. Here is a simple example: @@ -91,11 +91,11 @@ impl ValueSelector for MyValueSelector { `LeaderElector` trait allows you to define specific conditions, how to select leader for consensus. -__NOTE: it is important to provide deterministic mechanism, -because each participant will compute leader for itself +__NOTE: it is important to provide deterministic mechanism, +because each participant will compute leader for itself, and in case it is not deterministic, state divergence occurs.__ -We also provide ready-to-use +We also provide ready-to-use [DefaultLeaderElector](https://distributed-lab.github.io/bpcon/bpcon/leader/struct.DefaultLeaderElector.html) which is using weighted randomization. @@ -115,21 +115,53 @@ use bpcon::config::BPConConfig; let cfg = BPConConfig::with_default_timeouts(vec![1, 1, 1, 1, 1, 1], 4); ``` -Feel free to explore [config.rs](https://distributed-lab.github.io/bpcon/bpcon/config/struct.BPConConfig.html) +Feel free to explore [config.rs](https://distributed-lab.github.io/bpcon/bpcon/config/struct.BPConConfig.html) for more information. ### Create parties -Having `BPConConfig`, `ValueSelector` and `LeaderElector` defined, instantiate your parties. +Having `BPConConfig`, `ValueSelector` and `LeaderElector` defined, instantiate your parties. Check out [new](https://distributed-lab.github.io/bpcon/bpcon/party/struct.Party.html#method.new) method on a `Party` struct. ### Launch ballot on parties and handle messages -Each party interfaces communication with external system via channels. +Each party interfaces communication with external system via channels. In a way, you shall propagate outgoing messages to other parties like: 1. Listen for outgoing message using `msg_out_receiver`. 2. Forward it to other parties using `msg_in_sender`. -We welcome you to check `test_end_to_end_ballot` in `party.rs` for example. +We welcome you to check our [integration tests](./tests) for examples. + +## Security Considerations 🔐 + +### Categories of parties + +In real world applications, we may categorize parties by their behavior to following: + +1. Good - party sends messages to other participants based on following events, + and correctly receives and processes messages from other parties. + +2. Faulty - party has troubles receiving/sending messages. + These are simply mitigated by the weighed threshold and redundancy of consensus participants. + +3. Malicious - party launches DDoS attack using unbounded sending of messages - + to deal with this, we introduce rate-limiting mechanism in accepting messages inside the `Party`, + however it is also ❗️ required by integrating 'external' system ❗️, which handles `P2P`, to attest to this, because otherwise receiving channel may get flooded by malicious messages and block messages from other parties. + Another way to cause trouble is by sending invalid messages. For this, each party has + a set of checks for certain fields like current ballot number, status, etc. + Additionally, if the state transition caused by incoming message errored, it does not impact the party in either way. + +### Note on the leader 👑 + +If the `leader` of the ballot is faulty or malicious, the ballot deterministically fails and needs to be relaunched. + +### Note on the communication discrepancies 🔇 + +Each party has a certain period in which it may accept particular messages for a certain stage +(example: having passed 1a stage, it is open for accepting only 1b messages for 2 seconds). +These periods are configurable using `BPConConfig`, meaning you can program certain ranges +to allow slow parties to catch up, while others are waiting, before transiting to the next stage. + +In addition it is possible to schedule parties to launch at specific absolute time. diff --git a/src/config.rs b/src/config.rs index 5bf9bef..e880b29 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,7 @@ //! Definitions central to BPCon configuration. use std::time::Duration; +use tokio::time::Instant; /// Configuration structure for BPCon. /// @@ -18,15 +19,15 @@ pub struct BPConConfig { /// Threshold weight to define BFT quorum. /// - /// This value must be greater than 2/3 of the total weight of all parties combined. + /// This value must be greater than or equal to 2/3 of the total weight of all parties combined. /// The quorum is the minimum weight required to make decisions in the BPCon protocol. pub threshold: u128, - /// Timeout before the ballot is launched. + /// Absolute time, at which party begins to work. /// /// This timeout differs from `launch1a_timeout` as it applies to a distinct status /// and does not involve listening to external events and messages. - pub launch_timeout: Duration, + pub launch_at: Instant, /// Timeout before the 1a stage is launched. /// @@ -96,14 +97,47 @@ impl BPConConfig { party_weights, threshold, // TODO: deduce actually good defaults. - launch_timeout: Duration::from_secs(0), - launch1a_timeout: Duration::from_secs(5), - launch1b_timeout: Duration::from_secs(10), - launch2a_timeout: Duration::from_secs(15), - launch2av_timeout: Duration::from_secs(20), - launch2b_timeout: Duration::from_secs(25), - finalize_timeout: Duration::from_secs(30), - grace_period: Duration::from_secs(1), + launch_at: Instant::now(), + launch1a_timeout: Duration::from_millis(200), + launch1b_timeout: Duration::from_millis(400), + launch2a_timeout: Duration::from_millis(600), + launch2av_timeout: Duration::from_millis(800), + launch2b_timeout: Duration::from_millis(1000), + finalize_timeout: Duration::from_millis(1200), + grace_period: Duration::from_millis(0), } } + + /// Compute the Byzantine Fault Tolerance (BFT) threshold for the consensus protocol. + /// + /// This function calculates the minimum weight required to achieve a BFT quorum. + /// In BFT systems, consensus is typically reached when more than two-thirds + /// of the total weight is gathered from non-faulty parties. + /// + /// # Parameters + /// + /// - `party_weights`: A vector of weights corresponding to each party involved in the consensus. + /// These weights represent the voting power or influence of each party in the protocol. + /// + /// # Returns + /// + /// The BFT threshold as a `u128` value, which represents the minimum total weight + /// required to achieve consensus in a Byzantine Fault Tolerant system. This is calculated + /// as two-thirds of the total party weights. + /// + /// # Example + /// + /// ``` + /// use bpcon::config::BPConConfig; + /// + /// let party_weights = vec![10, 20, 30, 40, 50]; + /// let threshold = BPConConfig::compute_bft_threshold(party_weights); + /// assert_eq!(threshold, 100); + /// ``` + /// + /// In the example above, the total weight is 150, and the BFT threshold is calculated as `2/3 * 150 = 100`. + pub fn compute_bft_threshold(party_weights: Vec) -> u128 { + let total_weight: u128 = party_weights.iter().map(|&w| w as u128).sum(); + (2 * total_weight + 2) / 3 // adding 2 to keep division ceiling. + } } diff --git a/src/leader.rs b/src/leader.rs index d565b9a..ea6041c 100644 --- a/src/leader.rs +++ b/src/leader.rs @@ -155,42 +155,50 @@ impl> LeaderElector for DefaultLeaderElect #[cfg(test)] mod tests { use super::*; - use crate::party::tests::{default_config, default_party}; + use crate::config::BPConConfig; + use crate::test_mocks::MockParty; use rand::Rng; use std::thread; use std::time::Duration; #[test] fn test_default_leader_elector_determinism() { - let party = default_party(); + let party = MockParty::default(); let elector = DefaultLeaderElector::new(); - let leader1 = elector.elect_leader(&party).unwrap(); + const ITERATIONS: usize = 10; - // Test multiple iterations to ensure the leader remains the same - for i in 2..=10 { - let leader = elector.elect_leader(&party).unwrap(); - assert_eq!( - leader1, leader, - "Leaders should be consistent on repeated calls (iteration {})", - i - ); + // Collect multiple leaders + let leaders: Vec<_> = (0..ITERATIONS) + .map(|_| elector.elect_leader(&party).unwrap()) + .collect(); + + // Match the first leader and ensure all others are the same + match &leaders[..] { + [first_leader, rest @ ..] => { + assert!( + rest.iter().all(|leader| leader == first_leader), + "All leaders should be the same across multiple iterations." + ); + } + _ => panic!("No leaders were collected!"), } } #[test] fn test_default_leader_elector_fail_with_zero_weights() { - let mut party = default_party(); - let mut cfg = default_config(); - cfg.party_weights = vec![0, 0, 0]; + let mut party = MockParty::default(); + let cfg = BPConConfig { + party_weights: vec![0, 0, 0], + ..Default::default() + }; party.cfg = cfg; - let elector = DefaultLeaderElector::new(); - match elector.elect_leader(&party) { - Err(_) => {} // This is the expected behavior - _ => panic!("Expected DefaultLeaderElectorError::ZeroWeightSum"), - } + assert!( + elector.elect_leader(&party).is_err(), + "Expected DefaultLeaderElectorError::ZeroWeightSum" + ); } fn debug_hash_to_range_new(seed: u64, range: u64) -> u64 { @@ -218,7 +226,7 @@ mod tests { } #[test] - #[ignore] // Ignoring since it takes a while to run + #[ignore = "takes too long to run, launch manually"] fn test_hash_range_random() { // Test the uniform distribution diff --git a/src/lib.rs b/src/lib.rs index db83c12..b967d06 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,4 +3,6 @@ pub mod error; pub mod leader; pub mod message; pub mod party; +#[cfg(any(test, feature = "test-mocks"))] +pub mod test_mocks; pub mod value; diff --git a/src/message.rs b/src/message.rs index fdb6302..058aed7 100644 --- a/src/message.rs +++ b/src/message.rs @@ -30,7 +30,7 @@ pub struct MessageRouting { /// /// These message types represent the various stages of the BPCon consensus protocol, /// each corresponding to a specific phase in the process. -#[derive(PartialEq, Eq, Debug, Copy, Clone)] +#[derive(PartialEq, Eq, Debug, Copy, Clone, Hash)] pub enum ProtocolMessage { Msg1a, Msg1b, diff --git a/src/party.rs b/src/party.rs index 8282dc6..7e78c8a 100644 --- a/src/party.rs +++ b/src/party.rs @@ -23,9 +23,9 @@ use crate::value::{Value, ValueSelector}; use log::{debug, warn}; use std::cmp::PartialEq; use std::collections::hash_map::Entry::Vacant; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::time::sleep; +use tokio::time::{sleep, sleep_until}; /// Represents the status of a `Party` in the BPCon consensus protocol. /// @@ -120,6 +120,9 @@ pub struct Party> { /// The last value for which this party submitted a 2b message. last_value_voted: Option, + /// DDoS prevention mechanism - we allow each party to send one message type per ballot. + rate_limiter: HashSet<(ProtocolMessage, u64)>, + // Local round fields /// The state of 1b round, tracking which parties have voted and their corresponding values. parties_voted_before: HashMap>, @@ -182,6 +185,7 @@ impl> Party { leader: 0, last_ballot_voted: None, last_value_voted: None, + rate_limiter: HashSet::new(), parties_voted_before: HashMap::new(), messages_1b_weight: 0, value_2a: None, @@ -249,10 +253,10 @@ impl> Party { /// - `Ok(Some(V))`: The selected value if the ballot reaches consensus. /// - `Ok(None)`: If the ballot process is terminated without reaching consensus. /// - `Err(LaunchBallotError)`: If an error occurs during the ballot process. - pub async fn launch_ballot(&mut self) -> Result, LaunchBallotError> { + pub async fn launch_ballot(&mut self) -> Result { self.prepare_next_ballot()?; - sleep(self.cfg.launch_timeout).await; + sleep_until(self.cfg.launch_at).await; let launch1a_timer = sleep(self.cfg.launch1a_timeout); let launch1b_timer = sleep(self.cfg.launch1b_timeout); @@ -277,7 +281,7 @@ impl> Party { let mut launch2b_fired = false; let mut finalize_fired = false; - while self.is_launched() { + while self.status != PartyStatus::Finished { tokio::select! { _ = &mut launch1a_timer, if !launch1a_fired => { self.event_sender.send(PartyEvent::Launch1a).map_err(|err| { @@ -324,12 +328,27 @@ impl> Party { msg = self.msg_in_receiver.recv() => { sleep(self.cfg.grace_period).await; if let Some(msg) = msg { - debug!("Party {} received {} from party {}", self.id, msg.routing.msg_type, msg.routing.sender); + let meta = (msg.routing.msg_type, msg.routing.sender); + debug!("Party {} received {} from party {}", self.id, meta.0, meta.1); + + if self.id == meta.1{ + warn!("Received own message {}, intended to be broadcasted.", meta.0); + continue + } + if self.rate_limiter.contains(&meta){ + warn!("Party {} hit rate limit in party {} for message {}", meta.1, self.id, meta.0); + continue + } + if let Err(err) = self.update_state(&msg) { // Shouldn't fail the party, since invalid message - // may be sent by anyone. - warn!("Failed to update state with {}, got error: {err}", msg.routing.msg_type) + // may be sent by anyone. Furthermore, since in consensus + // we are relying on redundancy of parties, we actually may need + // less messages than from every party to transit to next status. + warn!("Failed to update state for party {} with {}, got error: {err}", self.id, meta.0) } + self.rate_limiter.insert(meta); + }else if self.msg_in_receiver.is_closed(){ self.status = PartyStatus::Failed; return Err(MessageChannelClosed) @@ -344,13 +363,13 @@ impl> Party { } }else if self.event_receiver.is_closed(){ self.status = PartyStatus::Failed; - return Err(EventChannelClosed) + return Err(EventChannelClosed) } }, } } - Ok(self.get_value_selected()) + Ok(self.get_value_selected().unwrap()) } /// Prepares the party's state for the next ballot. @@ -376,6 +395,7 @@ impl> Party { /// /// This method clears the state associated with previous rounds and prepares the party for the next ballot. fn reset_state(&mut self) { + self.rate_limiter = HashSet::new(); self.parties_voted_before = HashMap::new(); self.messages_1b_weight = 0; self.value_2a = None; @@ -474,7 +494,8 @@ impl> Party { self.messages_1b_weight += self.cfg.party_weights[routing.sender as usize] as u128; - if self.messages_1b_weight > self.cfg.threshold { + let self_weight = self.cfg.party_weights[self.id as usize] as u128; + if self.messages_1b_weight >= self.cfg.threshold - self_weight { self.status = PartyStatus::Passed1b; } } @@ -554,7 +575,8 @@ impl> Party { self.cfg.party_weights[routing.sender as usize] as u128, ); - if self.messages_2av_state.get_weight() > self.cfg.threshold { + let self_weight = self.cfg.party_weights[self.id as usize] as u128; + if self.messages_2av_state.get_weight() >= self.cfg.threshold - self_weight { self.status = PartyStatus::Passed2av; } } @@ -586,7 +608,8 @@ impl> Party { self.cfg.party_weights[routing.sender as usize] as u128, ); - if self.messages_2b_state.get_weight() > self.cfg.threshold { + let self_weight = self.cfg.party_weights[self.id as usize] as u128; + if self.messages_2b_state.get_weight() >= self.cfg.threshold - self_weight { self.status = PartyStatus::Passed2b; } } @@ -741,58 +764,23 @@ impl> Party { } #[cfg(test)] -pub(crate) mod tests { +mod tests { use super::*; use crate::leader::DefaultLeaderElector; - use crate::party::PartyStatus::{Launched, Passed1a, Passed1b, Passed2a}; - use std::collections::HashMap; - use std::fmt::{Display, Formatter}; - use std::time::Duration; + use crate::test_mocks::{MockParty, MockValue}; use tokio::time; - #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Debug)] - pub(crate) struct MockValue(u64); - - impl Display for MockValue { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "MockValue: {}", self.0) - } - } - - impl Value for MockValue {} - - #[derive(Clone)] - pub(crate) struct MockValueSelector; - - impl ValueSelector for MockValueSelector { - fn verify(&self, _v: &MockValue, _m: &HashMap>) -> bool { - true // For testing, always return true - } - - fn select(&self, _m: &HashMap>) -> MockValue { - MockValue(1) // For testing, always return the same value - } - } - - pub(crate) fn default_config() -> BPConConfig { - BPConConfig::with_default_timeouts(vec![1, 2, 3], 4) - } - - pub(crate) fn default_party() -> Party { - Party::::new( - 0, - default_config(), - MockValueSelector, - Box::new(DefaultLeaderElector::new()), - ) - .0 - } - #[test] fn test_update_state_msg1a() { - let mut party = default_party(); - party.status = Launched; + // We wish to simulate behavior of leader + // sending message to another participant. + let mut party = MockParty { + status: PartyStatus::Launched, + id: 0, + leader: 1, + ..Default::default() + }; let content = Message1aContent { ballot: party.ballot, }; @@ -800,189 +788,158 @@ pub(crate) mod tests { party.update_state(&msg).unwrap(); - assert_eq!(party.status, Passed1a); + assert_eq!(party.status, PartyStatus::Passed1a); } #[test] fn test_update_state_msg1b() { - let mut party = default_party(); - party.status = Passed1a; + let mut party = MockParty { + status: PartyStatus::Passed1a, + ..Default::default() + }; let content = Message1bContent { ballot: party.ballot, last_ballot_voted: None, - last_value_voted: bincode::serialize(&MockValue(42)).ok(), + last_value_voted: bincode::serialize(&MockValue::default()).ok(), }; - // First, send a 1b message from party 1 (weight 2) + // First, send a 1b message from party 1. let msg = content.pack(1).unwrap(); party.update_state(&msg).unwrap(); - // Then, send a 1b message from party 2 (weight 3) + // Then, send a 1b message from party 2. let msg = content.pack(2).unwrap(); party.update_state(&msg).unwrap(); - // After both messages, the cumulative weight is 2 + 3 = 5, which exceeds the threshold - assert_eq!(party.status, Passed1b); + // After both messages, the cumulative weight is + // 1 (own) + 1 (party 1) + 1 (party 2) = 3, which satisfies the threshold. + assert_eq!(party.status, PartyStatus::Passed1b); } #[test] fn test_update_state_msg2a() { - let mut party = default_party(); - party.status = Passed1b; - party.leader = 1; + let mut party = MockParty { + status: PartyStatus::Passed1b, + ..Default::default() + }; let content = Message2aContent { ballot: party.ballot, - value: bincode::serialize(&MockValue(42)).unwrap(), + value: bincode::serialize(&MockValue::default()).unwrap(), }; - let msg = content.pack(1).unwrap(); + let msg = content.pack(party.leader).unwrap(); + party.update_state(&msg).unwrap(); - assert_eq!(party.status, Passed2a); + assert_eq!(party.status, PartyStatus::Passed2a); } #[test] fn test_update_state_msg2av() { - let mut party = default_party(); - party.status = Passed2a; - party.value_2a = Some(MockValue(1)); + let value_to_verify = MockValue::default(); + let mut party = MockParty { + status: PartyStatus::Passed2a, + value_2a: Some(value_to_verify.clone()), + ..Default::default() + }; let content = Message2avContent { ballot: party.ballot, - received_value: bincode::serialize(&MockValue(1)).unwrap(), + received_value: bincode::serialize(&value_to_verify).unwrap(), }; - // Send first 2av message from party 1 (weight 2) + // First, send a 2av message from party 1. let msg = content.pack(1).unwrap(); party.update_state(&msg).unwrap(); - // Now send a second 2av message from party 2 (weight 3) + // Then, send a 2av message from party 2. let msg = content.pack(2).unwrap(); party.update_state(&msg).unwrap(); - // The cumulative weight (2 + 3) should exceed the threshold of 4 + // After both messages, the cumulative weight is + // 1 (own) + 1 (party 1) + 1 (party 2) = 3, which satisfies the threshold. assert_eq!(party.status, PartyStatus::Passed2av); } #[test] fn test_update_state_msg2b() { - let mut party = default_party(); - party.status = PartyStatus::Passed2av; + let mut party = MockParty { + status: PartyStatus::Passed2av, + ..Default::default() + }; - // Simulate that both party 1 and party 2 already sent 2av messages - party.messages_2av_state.add_sender(1, 2); - party.messages_2av_state.add_sender(2, 3); + // Simulate that both party 1 and party 2 have already sent 2av messages. + party.messages_2av_state.add_sender(1, 1); + party.messages_2av_state.add_sender(2, 1); let content = Message2bContent { ballot: party.ballot, }; - // Send first 2b message from party 1 (weight 2) + // First, send 2b message from party 1. let msg = content.pack(1).unwrap(); party.update_state(&msg).unwrap(); - // Print the current state and weight - println!( - "After first Msg2b: Status = {}, 2b Weight = {}", - party.status, - party.messages_2b_state.get_weight() - ); - - // Now send a second 2b message from party 2 (weight 3) + // Then, send 2b message from party 2. let msg = content.pack(2).unwrap(); party.update_state(&msg).unwrap(); - // Print the current state and weight - println!( - "After second Msg2b: Status = {}, 2b Weight = {}", - party.status, - party.messages_2b_state.get_weight() - ); - - // The cumulative weight (3 + 2) should exceed the threshold of 4 + // After both messages, the cumulative weight is + // 1 (own) + 1 (party 1) + 1 (party 2) = 3, which satisfies the threshold. assert_eq!(party.status, PartyStatus::Passed2b); } #[test] fn test_follow_event_launch1a() { - let cfg = default_config(); // Need to take ownership of msg_out_receiver, so that sender doesn't close, - // since otherwise msg_out_receiver will be dropped. - let (mut party, _msg_out_receiver, _) = Party::::new( - 0, - cfg, - MockValueSelector, - Box::new(DefaultLeaderElector {}), + // since otherwise msg_out_receiver will be dropped and party will fail. + let (mut party, _receiver_from, _) = MockParty::new( + Default::default(), + Default::default(), + Default::default(), + Box::new(DefaultLeaderElector::default()), ); - party.status = Launched; + party.status = PartyStatus::Launched; party.leader = party.id; - party - .follow_event(PartyEvent::Launch1a) - .expect("Failed to follow Launch1a event"); + party.follow_event(PartyEvent::Launch1a).unwrap(); // If the party is the leader and in the Launched state, the event should trigger a message. // And it's status shall update to Passed1a after sending 1a message, // contrary to other participants, whose `Passed1a` updates only after receiving 1a message. - assert_eq!(party.status, Passed1a); - } - - #[test] - fn test_follow_event_communication_failure() { - // msg_out_receiver channel, bound to corresponding sender, which will try to use - // follow event, is getting dropped since we don't take ownership of it - // upon creation of the party - let mut party = default_party(); - party.status = Launched; - party.leader = party.id; - - let result = party.follow_event(PartyEvent::Launch1a); - - match result { - Err(FailedToSendMessage(_)) => { - // this is expected outcome - } - _ => panic!( - "Expected FollowEventError::FailedToSendMessage, got {:?}", - result - ), - } + assert_eq!(party.status, PartyStatus::Passed1a); } #[tokio::test] async fn test_launch_ballot_events() { - // Pause the Tokio time so we can manipulate it + // Pause the Tokio time so we can manipulate it. time::pause(); - // Set up the Party with necessary configuration - let cfg = default_config(); + let cfg = BPConConfig::default(); - let (event_sender, mut event_receiver) = unbounded_channel(); + // Returning both channels so that party won't fail, + // because associated channels will close otherwise. + let (mut party, _receiver_from, _sender_into) = MockParty::new( + Default::default(), + cfg.clone(), + Default::default(), + Box::new(DefaultLeaderElector::default()), + ); - // Need to return all 3 values, so that they don't get dropped - // and associated channels don't get closed. - let (mut party, _msg_out_receiver, _msg_in_sender) = - Party::::new( - 0, - cfg.clone(), - MockValueSelector, - Box::new(DefaultLeaderElector::new()), - ); - - // Same here, we would like to not lose party's event_receiver, so that test doesn't fail. + let (event_sender, mut event_receiver) = unbounded_channel(); + // Keeping, so that associated party's event_receiver won't close + // and it doesn't fail. let _event_sender = party.event_sender; party.event_sender = event_sender; - // Spawn the launch_ballot function in a separate task - let _ballot_task = tokio::spawn(async move { - party.launch_ballot().await.unwrap(); + // Spawn the launch_ballot function in a separate task. + _ = tokio::spawn(async move { + _ = party.launch_ballot().await; }); - time::advance(cfg.launch_timeout).await; - - // Sequential time advance and event check + // Sequential time advance and event check. time::advance(cfg.launch1a_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1a); @@ -1002,156 +959,4 @@ pub(crate) mod tests { time::advance(cfg.finalize_timeout - cfg.launch2b_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Finalize); } - - #[tokio::test] - async fn test_end_to_end_ballot() { - // Configuration for the parties - let cfg = BPConConfig::with_default_timeouts(vec![1, 1, 1, 1], 2); - - // ValueSelector and LeaderElector instances - let value_selector = MockValueSelector; - let leader_elector = Box::new(DefaultLeaderElector::new()); - - // Create 4 parties - let (mut party0, msg_out_receiver0, msg_in_sender0) = - Party::::new( - 0, - cfg.clone(), - value_selector.clone(), - leader_elector.clone(), - ); - let (mut party1, msg_out_receiver1, msg_in_sender1) = - Party::::new( - 1, - cfg.clone(), - value_selector.clone(), - leader_elector.clone(), - ); - let (mut party2, msg_out_receiver2, msg_in_sender2) = - Party::::new( - 2, - cfg.clone(), - value_selector.clone(), - leader_elector.clone(), - ); - let (mut party3, msg_out_receiver3, msg_in_sender3) = - Party::::new( - 3, - cfg.clone(), - value_selector.clone(), - leader_elector.clone(), - ); - - // Channels for receiving the selected values - let (value_sender0, value_receiver0) = tokio::sync::oneshot::channel(); - let (value_sender1, value_receiver1) = tokio::sync::oneshot::channel(); - let (value_sender2, value_receiver2) = tokio::sync::oneshot::channel(); - let (value_sender3, value_receiver3) = tokio::sync::oneshot::channel(); - - // Launch ballot tasks for each party - let ballot_task0 = tokio::spawn(async move { - match party0.launch_ballot().await { - Ok(Some(value)) => { - value_sender0.send(value).unwrap(); - } - Ok(None) => { - eprintln!("Party 0: No value was selected"); - } - Err(err) => { - eprintln!("Party 0 encountered an error: {:?}", err); - } - } - }); - - let ballot_task1 = tokio::spawn(async move { - match party1.launch_ballot().await { - Ok(Some(value)) => { - value_sender1.send(value).unwrap(); - } - Ok(None) => { - eprintln!("Party 1: No value was selected"); - } - Err(err) => { - eprintln!("Party 1 encountered an error: {:?}", err); - } - } - }); - - let ballot_task2 = tokio::spawn(async move { - match party2.launch_ballot().await { - Ok(Some(value)) => { - value_sender2.send(value).unwrap(); - } - Ok(None) => { - eprintln!("Party 2: No value was selected"); - } - Err(err) => { - eprintln!("Party 2 encountered an error: {:?}", err); - } - } - }); - - let ballot_task3 = tokio::spawn(async move { - match party3.launch_ballot().await { - Ok(Some(value)) => { - value_sender3.send(value).unwrap(); - } - Ok(None) => { - eprintln!("Party 3: No value was selected"); - } - Err(err) => { - eprintln!("Party 3 encountered an error: {:?}", err); - } - } - }); - - // Simulate message passing between the parties - tokio::spawn(async move { - let mut receivers = [ - msg_out_receiver0, - msg_out_receiver1, - msg_out_receiver2, - msg_out_receiver3, - ]; - let senders = [ - msg_in_sender0, - msg_in_sender1, - msg_in_sender2, - msg_in_sender3, - ]; - - loop { - for (i, receiver) in receivers.iter_mut().enumerate() { - if let Ok(msg) = receiver.try_recv() { - // Broadcast the message to all other parties - for (j, sender) in senders.iter().enumerate() { - if i != j { - sender.send(msg.clone()).unwrap(); - } - } - } - } - - // Delay to simulate network latency - sleep(Duration::from_millis(100)).await; - } - }); - - // Await the completion of ballot tasks - ballot_task0.await.unwrap(); - ballot_task1.await.unwrap(); - ballot_task2.await.unwrap(); - ballot_task3.await.unwrap(); - - // Await results from each party - let value0 = value_receiver0.await.unwrap(); - let value1 = value_receiver1.await.unwrap(); - let value2 = value_receiver2.await.unwrap(); - let value3 = value_receiver3.await.unwrap(); - - // Check that all parties reached the same consensus value - assert_eq!(value0, value1, "Party 0 and 1 agreed on the same value"); - assert_eq!(value1, value2, "Party 1 and 2 agreed on the same value"); - assert_eq!(value2, value3, "Party 2 and 3 agreed on the same value"); - } } diff --git a/src/test_mocks.rs b/src/test_mocks.rs new file mode 100644 index 0000000..1263933 --- /dev/null +++ b/src/test_mocks.rs @@ -0,0 +1,52 @@ +use crate::config::BPConConfig; +use crate::leader::DefaultLeaderElector; +use crate::party::Party; +use crate::value::{Value, ValueSelector}; +use std::collections::HashMap; +use std::fmt; + +#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Debug, Default)] +pub struct MockValue(u64); + +impl fmt::Display for MockValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "MockValue: {}", self.0) + } +} + +impl Value for MockValue {} + +#[derive(Clone, Default)] +pub struct MockValueSelector; + +impl ValueSelector for MockValueSelector { + fn verify(&self, _: &MockValue, _: &HashMap>) -> bool { + true // For testing, always return true. + } + + fn select(&self, _: &HashMap>) -> MockValue { + MockValue(1) // For testing, always return the same value. + } +} + +impl Default for BPConConfig { + fn default() -> Self { + let weights = vec![1, 1, 1, 1]; + let threshold = BPConConfig::compute_bft_threshold(weights.clone()); + BPConConfig::with_default_timeouts(weights, threshold) + } +} + +pub type MockParty = Party; + +impl Default for MockParty { + fn default() -> Self { + MockParty::new( + Default::default(), + Default::default(), + Default::default(), + Box::new(DefaultLeaderElector::default()), + ) + .0 + } +} diff --git a/tests/mod.rs b/tests/mod.rs new file mode 100644 index 0000000..e81927f --- /dev/null +++ b/tests/mod.rs @@ -0,0 +1,287 @@ +use bpcon::config::BPConConfig; +use bpcon::error::LaunchBallotError; +use bpcon::leader::{DefaultLeaderElector, LeaderElector}; +use bpcon::message::{Message1bContent, MessagePacket}; + +use bpcon::test_mocks::{MockParty, MockValue, MockValueSelector}; + +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::task::JoinHandle; +use tokio::time::{sleep, Duration, Instant}; + +use futures::future::join_all; + +/// Here each party/receiver/sender shall correspond at equal indexes. +type PartiesWithChannels = ( + Vec, + Vec>, + Vec>, +); + +/// Create test parties with predefined generics, based on config. +fn create_parties(cfg: BPConConfig) -> PartiesWithChannels { + (0..cfg.party_weights.len()) + .map(|i| { + MockParty::new( + i as u64, + cfg.clone(), + MockValueSelector, + Box::new(DefaultLeaderElector::new()), + ) + }) + .fold( + (Vec::new(), Vec::new(), Vec::new()), + |(mut parties, mut receivers, mut senders), (p, r, s)| { + parties.push(p); + receivers.push(r); + senders.push(s); + (parties, receivers, senders) + }, + ) +} + +/// Begin ballot process for each party. +fn launch_parties( + parties: Vec, +) -> Vec>> { + parties + .into_iter() + .map(|mut party| tokio::spawn(async move { party.launch_ballot().await })) + .collect() +} + +/// Collect messages from receivers. +fn collect_messages(receivers: &mut [UnboundedReceiver]) -> Vec { + receivers + .iter_mut() + .filter_map(|receiver| receiver.try_recv().ok()) + .collect() +} + +/// Broadcast collected messages to other parties, skipping the sender. +fn broadcast_messages(messages: Vec, senders: &[UnboundedSender]) { + messages.iter().for_each(|msg| { + senders + .iter() + .enumerate() + .filter(|(i, _)| msg.routing.sender != *i as u64) // Skip the current party (sender). + .for_each(|(_, sender_into)| { + sender_into.send(msg.clone()).unwrap(); + }); + }); +} + +/// Propagate messages peer-to-peer between parties using their channels. +fn propagate_p2p( + mut receivers: Vec>, + senders: Vec>, +) -> JoinHandle<()> { + tokio::spawn(async move { + loop { + let messages = collect_messages(receivers.as_mut_slice()); + broadcast_messages(messages, &senders); + + // Reduce processor load. + sleep(Duration::from_millis(1)).await; + } + }) +} + +/// Await completion of each party's process and aggregate results. +async fn await_results( + tasks: Vec>>, +) -> Vec> { + join_all(tasks) + .await + .into_iter() + .map(|res| res.unwrap()) + .collect() +} + +/// Assert consensus reached and log all errors. +fn analyze_ballot(results: Vec>) { + // Partition the results into successful values and errors. + let (successful, errors): (Vec<_>, Vec<_>) = results.into_iter().partition(|res| res.is_ok()); + + // Log errors if any. + if !errors.is_empty() { + for err in errors.into_iter() { + eprintln!("Error during ballot: {:?}", err.unwrap_err()); + } + } + + if successful.is_empty() { + panic!("No consensus, all parties failed."); + } + + // Extract the successful values. + let values: Vec = successful.into_iter().map(|res| res.unwrap()).collect(); + + // Check if we reached consensus: all values should be the same. + if let Some((first_value, rest)) = values.split_first() { + let all_agreed = rest.iter().all(|v| v == first_value); + assert!( + all_agreed, + "No consensus, different values found: {:?}", + values + ); + println!("Consensus reached with value: {:?}", first_value); + } +} + +/// Run ballot on given parties, simulating faulty behavior for given `faulty_ids`. +async fn run_ballot_faulty_party( + parties: PartiesWithChannels, + faulty_ids: Vec, +) -> Vec> { + let (mut parties, mut receivers, mut senders) = parties; + + // Simulate failure excluding faulty parties from all processes: + for id in faulty_ids { + parties.remove(id); + receivers.remove(id); + senders.remove(id); + } + + let ballot_tasks = launch_parties(parties); + let p2p_task = propagate_p2p(receivers, senders); + let results = await_results(ballot_tasks).await; + p2p_task.abort(); + results +} + +#[tokio::test] +async fn test_ballot_happy_case() { + let (parties, receivers, senders) = create_parties(BPConConfig::default()); + let ballot_tasks = launch_parties(parties); + let p2p_task = propagate_p2p(receivers, senders); + let results = await_results(ballot_tasks).await; + p2p_task.abort(); + + analyze_ballot(results); +} + +#[tokio::test] +async fn test_ballot_faulty_party_common() { + let parties = create_parties(BPConConfig::default()); + let elector = DefaultLeaderElector::new(); + let leader = elector.elect_leader(&parties.0[0]).unwrap(); + let faulty_ids: Vec = vec![3]; + for id in faulty_ids.iter() { + assert_ne!( + *id as u64, leader, + "Should not fail the leader for the test to pass" + ); + } + let results = run_ballot_faulty_party(parties, faulty_ids).await; + + analyze_ballot(results); +} + +#[tokio::test] +async fn test_ballot_faulty_party_leader() { + let parties = create_parties(BPConConfig::default()); + let elector = DefaultLeaderElector::new(); + let leader = elector.elect_leader(&parties.0[0]).unwrap(); + let faulty_ids = vec![leader as usize]; + + let results = run_ballot_faulty_party(parties, faulty_ids).await; + + assert!( + results.into_iter().all(|res| res.is_err()), + "All parties should have failed having a faulty leader in the consensus." + ); +} + +#[tokio::test] +async fn test_ballot_malicious_party() { + let (parties, mut receivers, senders) = create_parties(BPConConfig::default()); + + let elector = DefaultLeaderElector::new(); + let leader = elector.elect_leader(&parties[0]).unwrap(); + const MALICIOUS_PARTY_ID: u64 = 1; + + assert_ne!( + MALICIOUS_PARTY_ID, leader, + "Should not make malicious the leader for the test to pass" + ); + + // We will be simulating malicious behaviour + // sending 1b message (meaning, 5/6 times at incorrect stage) with the wrong data. + let content = &Message1bContent { + ballot: parties[0].ballot() + 1, // divergent ballot number + last_ballot_voted: Some(parties[0].ballot() + 1), // early ballot number + // shouldn't put malformed serialized value, because we won't be able to pack it + last_value_voted: None, + }; + let malicious_msg = content.pack(MALICIOUS_PARTY_ID).unwrap(); + + let ballot_tasks = launch_parties(parties); + let p2p_task = tokio::spawn(async move { + // It is responsibility of the external to party code - p2p module + // to rate-limit channels, because otherwise malicious + // actors would be able to DDoS ballot, bloating all the channel with malicious ones. + // For this test to pass, we will send malicious messages once in a while. + let mut last_malicious_message_time = Instant::now(); + let malicious_message_interval = Duration::from_millis(100); + loop { + // Collect all messages first. + let mut messages: Vec<_> = receivers + .iter_mut() + .enumerate() + .filter_map(|(i, receiver)| { + // Skip receiving messages from the malicious party + // to substitute it with invalid one to be propagated. + (i != MALICIOUS_PARTY_ID as usize) + .then(|| receiver.try_recv().ok()) + .flatten() + }) + .collect(); + + // Push the malicious message at intervals. + if last_malicious_message_time.elapsed() >= malicious_message_interval { + messages.push(malicious_msg.clone()); + last_malicious_message_time = Instant::now(); + } + + broadcast_messages(messages, &senders); + + // Delay to simulate network latency. + sleep(Duration::from_millis(100)).await; + } + }); + + let results = await_results(ballot_tasks).await; + p2p_task.abort(); + + analyze_ballot(results); +} + +#[tokio::test] +#[ignore = "takes 20 secs to run, launch manually"] +async fn test_ballot_many_parties() { + const AMOUNT_OF_PARTIES: usize = 999; + let party_weights = vec![1; AMOUNT_OF_PARTIES]; + let threshold = BPConConfig::compute_bft_threshold(party_weights.clone()); + + let cfg = BPConConfig { + party_weights, + threshold, + launch_at: Instant::now(), + launch1a_timeout: Duration::from_secs(0), // 1a's and 2a's are sent only by leader + launch1b_timeout: Duration::from_secs(1), // meaning we need to wait less. + launch2a_timeout: Duration::from_secs(5), + launch2av_timeout: Duration::from_secs(7), + launch2b_timeout: Duration::from_secs(12), + finalize_timeout: Duration::from_secs(19), + grace_period: Duration::from_secs(0), + }; + + let (parties, receivers, senders) = create_parties(cfg); + let ballot_tasks = launch_parties(parties); + let p2p_task = propagate_p2p(receivers, senders); + let results = await_results(ballot_tasks).await; + p2p_task.abort(); + + analyze_ballot(results); +}