diff --git a/Cargo.toml b/Cargo.toml index 4d16495..e1f65ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,14 +2,29 @@ name = "bpcon" version = "0.1.0" edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +description = "BPCon: A Byzantine Fault-Tolerant Consensus Protocol Implementation in Rust." +license = "MIT" +repository = "https://github.com/distributed-lab/bpcon" +homepage = "https://github.com/distributed-lab/bpcon" +documentation = "https://distributed-lab.github.io/bpcon/" +keywords = ["consensus", "byzantine", "protocol", "distributed-systems", "blockchain"] +categories = ["algorithms"] +authors = ["Distributed Lab"] +readme = "README.md" [dependencies] -log = "0.4.22" -rkyv = { version = "0.7.44", features = ["validation"]} -serde = { version = "1.0.207", features = ["derive"] } -bincode = "1.3.3" -tokio = { version = "1.39.2", features = ["full", "test-util"] } -rand = "0.9.0-alpha.2" -seeded-random = "0.6.0" \ No newline at end of file +log = "^0.4.22" +serde = { version = "^1.0.207", features = ["derive"] } +bincode = "^1.3.3" +rkyv = { version = "^0.7.44", features = ["validation"] } +tokio = { version = "^1.39.2", features = ["full"] } +rand = "^0.9.0-alpha.2" +seeded-random = "^0.6.0" +thiserror = "^1.0.63" + +[features] +default = ["full"] +full = ["tokio/full", "rkyv/validation"] + +[dev-dependencies] +tokio = { version = "^1.39.2", features = ["test-util"] } diff --git a/README.md b/README.md index 6d8f44a..b61b98c 100644 --- a/README.md +++ b/README.md @@ -1,27 +1,135 @@ -# BPCon Rust Library +# Weighted BPCon Rust Library [![License: GPL v3](https://img.shields.io/badge/License-GPLv3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0) +[![Rust CI🌌](https://github.com/distributed-lab/bpcon/actions/workflows/rust.yml/badge.svg)](https://github.com/distributed-lab/bpcon/actions/workflows/rust.yml) +[![Docs 🌌](https://github.com/distributed-lab/bpcon/actions/workflows/docs.yml/badge.svg)](https://github.com/distributed-lab/bpcon/actions/workflows/docs.yml) -This is a generic rust implementation of the `BPCon` consensus mechanism. +> This is a rust library implementing weighted BPCon consensus. -## Library Structure +## Documentation -### src/party.rs +Library is documented with `rustdoc`. +Compiled documentation for `main` branch is available at [GitBook](https://distributed-lab.github.io/bpcon). -Main entity in this implementation is `Party` - it represents member of the consensus. +## Usage -External system shall create desired amount of parties. +### Add dependency in your `Cargo.toml` -We have 2 communication channels - one for sending `MessageWire` - encoded in bytes message and routing information, -and the other for pitching consensus events - this allows for external system to impose custom limitations and rules -regarding runway. +```toml +[dependencies] +bpcon = {version = "0.1.0", git = "https://github.com/distributed-lab/bpcon"} +``` -### src/message.rs +### Implement [Value](https://distributed-lab.github.io/bpcon/bpcon/value/trait.Value.html) trait -Definitions of the general message struct, routing information and type-specific contents. +This is a core trait, which defines what type are you selecting in your consensus. +It may be the next block in blockchain, or leader for some operation, or anything else you need. -### src/lib.rs +Below is a simple example, where we will operate on selection for `u64` type. +Using it you may interpret `ID` for leader of distributed operation, for instance. -Here we present a trait for the value on which consensus is being conducted. Additionally, there is a trait for -defining custom value selection rules, called `ValueSelector`. +```rust +... +use bpcon::value::Value; +#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Debug, Hash)] +pub(crate) struct MyValue(u64); + +impl Value for MyValue {} +``` + +### Implement [ValueSelector](https://distributed-lab.github.io/bpcon/bpcon/value/trait.ValueSelector.html) trait + +`BPCon` allows you to define specific conditions how proposer (leader) will select value +and how other members will verify its selection. + +Here is a simple example: + +```rust +... +use bpcon::value::ValueSelector; + +#[derive(Clone)] +pub struct MyValueSelector; + +impl ValueSelector for MyValueSelector { + /// Verifies if the given value `v` has been correctly selected according to the protocol rules. + /// + /// In this basic implementation, we'll consider a value as "correctly selected" if it matches + /// the majority of the values in the provided `HashMap`. + fn verify(&self, v: &MyValue, m: &HashMap>) -> bool { + // Count how many times the value `v` appears in the `HashMap` + let count = m.values().filter(|&val| val.as_ref() == Some(v)).count(); + + // For simplicity, consider the value verified if it appears in more than half of the entries + count > m.len() / 2 + } + + /// Selects a value based on the provided `HashMap` of party votes. + /// + /// This implementation selects the value that appears most frequently in the `HashMap`. + /// If there is a tie, it selects the smallest value (as per the natural ordering of `u64`). + fn select(&self, m: &HashMap>) -> MyValue { + let mut frequency_map = HashMap::new(); + + // Count the frequency of each value in the `HashMap` + for value in m.values().flatten() { + *frequency_map.entry(value).or_insert(0) += 1; + } + + // Find the value with the highest frequency. In case of a tie, select the smallest value. + frequency_map + .into_iter() + .max_by_key(|(value, count)| (*count, value.0)) + .map(|(value, _)| value.clone()) + .expect("No valid value found") + } +} +``` + +### Decide on a [LeaderElector](https://distributed-lab.github.io/bpcon/bpcon/leader/trait.LeaderElector.html) + +`LeaderElector` trait allows you to define specific conditions, how to select leader for consensus. + +__NOTE: it is important to provide deterministic mechanism, +because each participant will compute leader for itself +and in case it is not deterministic, state divergence occurs.__ + +We also provide ready-to-use +[DefaultLeaderElector](https://distributed-lab.github.io/bpcon/bpcon/leader/struct.DefaultLeaderElector.html) +which is using weighted randomization. + +### Configure your ballot + +As a next step, you need to decide on parameters for your ballot: + +1. Amount of parties and their weight. +2. Threshold weight. +3. Time bounds. + +Example: + +```rust +use bpcon::config::BPConConfig; + +let cfg = BPConConfig::with_default_timeouts(vec![1, 1, 1, 1, 1, 1], 4); +``` + +Feel free to explore [config.rs](https://distributed-lab.github.io/bpcon/bpcon/config/struct.BPConConfig.html) +for more information. + +### Create parties + +Having `BPConConfig`, `ValueSelector` and `LeaderElector` defined, instantiate your parties. +Check out [new](https://distributed-lab.github.io/bpcon/bpcon/party/struct.Party.html#method.new) +method on a `Party` struct. + +### Launch ballot on parties and handle messages + +Each party interfaces communication with external system via channels. +In a way, you shall propagate outgoing messages to other parties like: + +1. Listen for outgoing message using `msg_out_receiver`. +2. Forward it to other parties using `msg_in_sender`. + +We welcome you to check `test_end_to_end_ballot` in `party.rs` for example. diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..5bf9bef --- /dev/null +++ b/src/config.rs @@ -0,0 +1,109 @@ +//! Definitions central to BPCon configuration. + +use std::time::Duration; + +/// Configuration structure for BPCon. +/// +/// `BPConConfig` holds the various timeouts and weight configurations necessary +/// for the BPCon consensus protocol. This configuration controls the timing +/// of different stages in the protocol and the distribution of party weights. +#[derive(PartialEq, Eq, Debug, Clone)] +pub struct BPConConfig { + /// Parties weights: `party_weights[i]` corresponds to the i-th party's weight. + /// + /// These weights determine the influence each party has in the consensus process. + /// The sum of these weights is used to calculate the `threshold` for reaching a + /// Byzantine Fault Tolerant (BFT) quorum. + pub party_weights: Vec, + + /// Threshold weight to define BFT quorum. + /// + /// This value must be greater than 2/3 of the total weight of all parties combined. + /// The quorum is the minimum weight required to make decisions in the BPCon protocol. + pub threshold: u128, + + /// Timeout before the ballot is launched. + /// + /// This timeout differs from `launch1a_timeout` as it applies to a distinct status + /// and does not involve listening to external events and messages. + pub launch_timeout: Duration, + + /// Timeout before the 1a stage is launched. + /// + /// The 1a stage is the first phase of the BPCon consensus process, where leader + /// is informing other participants about the start of the ballot. + /// This timeout controls the delay before starting this stage. + pub launch1a_timeout: Duration, + + /// Timeout before the 1b stage is launched. + /// + /// In the 1b stage, participants exchange their last voted ballot number and elected value. + /// This timeout controls the delay before starting this stage. + pub launch1b_timeout: Duration, + + /// Timeout before the 2a stage is launched. + /// + /// The 2a stage involves leader proposing a selected value and + /// other participants verifying it. + /// This timeout controls the delay before starting this stage. + pub launch2a_timeout: Duration, + + /// Timeout before the 2av stage is launched. + /// + /// The 2av stage is where 2a obtained value shall gather needed weight to pass. + /// This timeout controls the delay before starting this stage. + pub launch2av_timeout: Duration, + + /// Timeout before the 2b stage is launched. + /// + /// The 2b stage is where the final value is chosen and broadcasted. + /// This timeout controls the delay before starting this stage. + pub launch2b_timeout: Duration, + + /// Timeout before the finalization stage is launched. + /// + /// The finalization stage is not a part of protocol, but rather internal-centric mechanics + /// to conclude ballot. + /// This timeout controls the delay before starting this stage. + pub finalize_timeout: Duration, + + /// Timeout for a graceful period to accommodate parties with latency. + /// + /// This grace period allows parties with slower communication or processing times + /// to catch up, helping to ensure that all parties can participate fully in the + /// consensus process. + pub grace_period: Duration, +} + +impl BPConConfig { + /// Create a new `BPConConfig` with default timeouts. + /// + /// This method initializes a `BPConConfig` instance using default timeout values for each + /// stage of the BPCon consensus protocol. These defaults are placeholders and should be + /// tuned according to the specific needs and characteristics of the network. + /// + /// # Parameters + /// + /// - `party_weights`: A vector of weights corresponding to each party involved in the consensus. + /// - `threshold`: The weight threshold required to achieve a BFT quorum. + /// + /// # Returns + /// + /// A new `BPConConfig` instance with the provided `party_weights` and `threshold`, + /// and default timeouts for all stages. + pub fn with_default_timeouts(party_weights: Vec, threshold: u128) -> Self { + Self { + party_weights, + threshold, + // TODO: deduce actually good defaults. + launch_timeout: Duration::from_secs(0), + launch1a_timeout: Duration::from_secs(5), + launch1b_timeout: Duration::from_secs(10), + launch2a_timeout: Duration::from_secs(15), + launch2av_timeout: Duration::from_secs(20), + launch2b_timeout: Duration::from_secs(25), + finalize_timeout: Duration::from_secs(30), + grace_period: Duration::from_secs(1), + } + } +} diff --git a/src/error.rs b/src/error.rs index b9d84b8..3999324 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,56 +1,254 @@ -//! Definition of the BPCon errors. - -use std::fmt; - -#[derive(Debug)] -pub enum BallotError { - MessageParsing(String), - ValueParsing(String), - InvalidState(String), - Communication(String), - LeaderElection(String), -} - -impl fmt::Display for BallotError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - BallotError::MessageParsing(ref err) => write!(f, "Message parsing error: {}", err), - BallotError::ValueParsing(ref err) => write!(f, "Value parsing error: {}", err), - BallotError::InvalidState(ref err) => write!(f, "Invalid state error: {}", err), - BallotError::Communication(ref err) => write!(f, "Communication error: {}", err), - BallotError::LeaderElection(ref err) => write!(f, "Leader election error: {}", err), - } +//! Definitions central to BPCon errors. + +use crate::party::{PartyEvent, PartyStatus}; +use crate::value::Value; +use thiserror::Error; + +/// Represents the various errors that can occur during the ballot launch process. +#[derive(Error, Debug)] +pub enum LaunchBallotError { + /// Occurs when an attempt to send a `PartyEvent` fails. + /// + /// - `0`: The `PartyEvent` that failed to send. + /// - `1`: A string detailing the specific failure reason. + #[error("failed to send event {0}: {1}")] + FailedToSendEvent(PartyEvent, String), + + /// Occurs when the event channel is unexpectedly closed. + #[error("event channel closed")] + EventChannelClosed, + + /// Occurs when the message channel is unexpectedly closed. + #[error("message channel closed")] + MessageChannelClosed, + + /// Occurs when following a `PartyEvent` fails. + /// + /// - `0`: The `PartyEvent` that caused the error. + /// - `1`: The specific `FollowEventError` detailing why the follow operation failed. + #[error("failed to follow event {0}: {1}")] + FollowEventError(PartyEvent, FollowEventError), + + /// Occurs during leader election if an error is encountered. + /// + /// - `0`: A string providing details about the leader election failure. + #[error("leader election error: {0}")] + LeaderElectionError(String), +} + +/// Represents the various errors that can occur while following a party event. +#[derive(Error, Debug)] +pub enum FollowEventError { + /// Occurs when there is a mismatch in the expected party status. + /// + /// - `0`: The specific `PartyStatusMismatch` that caused the error. + #[error("{0}")] + PartyStatusMismatch(PartyStatusMismatch), + + /// Occurs when there is an error during serialization. + /// + /// - `0`: The specific `SerializationError` encountered. + #[error("{0}")] + SerializationError(SerializationError), + + /// Occurs when sending a message related to the event fails. + /// + /// - `0`: A string describing the reason for the failure. + #[error("{0}")] + FailedToSendMessage(String), +} + +/// Represents the various errors that can occur when updating the state in a consensus protocol. +/// +/// This enum is generic over `V`, which must implement the `Value` trait. +#[derive(Error, Debug)] +pub enum UpdateStateError { + /// Occurs when there is a mismatch in the expected party status. + /// + /// - `0`: The specific `PartyStatusMismatch` that caused the error. + #[error("{0}")] + PartyStatusMismatch(PartyStatusMismatch), + + /// Occurs when there is a mismatch in the ballot number. + /// + /// - `0`: The specific `BallotNumberMismatch` that caused the error. + #[error("{0}")] + BallotNumberMismatch(BallotNumberMismatch), + + /// Occurs when there is a mismatch in the expected leader. + /// + /// - `0`: The specific `LeaderMismatch` that caused the error. + #[error("{0}")] + LeaderMismatch(LeaderMismatch), + + /// Occurs when there is a mismatch in the expected value. + /// + /// - `0`: The specific `ValueMismatch` that caused the error, where `V` is the type of the value. + #[error("{0}")] + ValueMismatch(ValueMismatch), + + /// Occurs when the value verification fails during the state update process. + #[error("value verification failed")] + ValueVerificationFailed, + + /// Occurs when there is an error during deserialization. + /// + /// - `0`: The specific `DeserializationError` encountered. + #[error("{0}")] + DeserializationError(DeserializationError), +} + +/// Represents an error that occurs when there is a mismatch between the current status of the party +/// and the status required for an operation. +/// +/// This error is typically encountered when an operation requires the party to be in a specific +/// state, but the party is in a different state. +#[derive(Error, Debug)] +#[error( + "party status mismatch: party status is {party_status} whilst needed status is {needed_status}" +)] +pub struct PartyStatusMismatch { + /// The current status of the party. + pub party_status: PartyStatus, + /// The status required for the operation to proceed. + pub needed_status: PartyStatus, +} + +/// Represents an error that occurs when there is a mismatch between the ballot number of the party +/// and the ballot number received in a message. +#[derive(Error, Debug)] +#[error("ballot number mismatch: party's ballot number is {party_ballot_number} whilst received {message_ballot_number} in the message")] +pub struct BallotNumberMismatch { + /// The ballot number held by the party. + pub party_ballot_number: u64, + /// The ballot number received in the message. + pub message_ballot_number: u64, +} + +/// Represents an error that occurs when there is a mismatch between the expected leader +/// of the party and the sender of the message. +#[derive(Error, Debug)] +#[error("leader mismatch: party's leader is {party_leader} whilst the message was sent by {message_sender}")] +pub struct LeaderMismatch { + /// The leader identifier of the party. + pub party_leader: u64, + /// The identifier of the message sender. + pub message_sender: u64, +} + +/// Represents an error that occurs when there is a mismatch between the value held by the party +/// and the value received in a message. +/// +/// This struct is generic over `V`, which must implement the `Value` trait. +#[derive(Error, Debug)] +#[error( + "value mismatch: party's value is {party_value} whilst received {message_value} in the message" +)] +pub struct ValueMismatch { + /// The value held by the party. + pub party_value: V, + /// The value received in the message. + pub message_value: V, +} + +/// Represents the various errors that can occur during the deserialization process. +#[derive(Error, Debug)] +pub enum DeserializationError { + /// Occurs when there is an error deserializing a message. + /// + /// - `0`: A string describing the specific deserialization error for the message. + #[error("message deserialization error: {0}")] + Message(String), + + /// Occurs when there is an error deserializing a value. + /// + /// - `0`: A string describing the specific deserialization error for the value. + #[error("value deserialization error: {0}")] + Value(String), +} + +/// Represents the various errors that can occur during the serialization process. +#[derive(Error, Debug)] +pub enum SerializationError { + /// Occurs when there is an error serializing a message. + /// + /// - `0`: A string describing the specific serialization error for the message. + #[error("message serialization error: {0}")] + Message(String), + + /// Occurs when there is an error serializing a value. + /// + /// - `0`: A string describing the specific serialization error for the value. + #[error("value serialization error: {0}")] + Value(String), +} + +/// Converts a `PartyStatusMismatch` into a `FollowEventError`. +/// +/// This conversion allows `PartyStatusMismatch` errors to be treated as `FollowEventError`, +/// which may occur when there is a status mismatch while following an event. +impl From for FollowEventError { + fn from(error: PartyStatusMismatch) -> Self { + FollowEventError::PartyStatusMismatch(error) + } +} + +/// Converts a `SerializationError` into a `FollowEventError`. +/// +/// This conversion is used when a serialization error occurs during the process +/// of following an event, allowing it to be handled as a `FollowEventError`. +impl From for FollowEventError { + fn from(error: SerializationError) -> Self { + FollowEventError::SerializationError(error) + } +} + +/// Converts a `PartyStatusMismatch` into an `UpdateStateError`. +/// +/// This conversion is useful when a status mismatch occurs while updating the state, +/// allowing the error to be handled within the context of state updates. +impl From for UpdateStateError { + fn from(error: PartyStatusMismatch) -> Self { + UpdateStateError::PartyStatusMismatch(error) } } -impl std::error::Error for BallotError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - // Since these are all simple String errors, there is no underlying source error. - None +/// Converts a `LeaderMismatch` into an `UpdateStateError`. +/// +/// This conversion is used when there is a leader mismatch during state updates, +/// allowing the error to be propagated and handled as an `UpdateStateError`. +impl From for UpdateStateError { + fn from(error: LeaderMismatch) -> Self { + UpdateStateError::LeaderMismatch(error) } } -#[cfg(test)] -mod tests { - use super::*; +/// Converts a `BallotNumberMismatch` into an `UpdateStateError`. +/// +/// This conversion is used when there is a ballot number mismatch during state updates, +/// allowing the error to be propagated and handled as an `UpdateStateError`. +impl From for UpdateStateError { + fn from(error: BallotNumberMismatch) -> Self { + UpdateStateError::BallotNumberMismatch(error) + } +} - #[test] - fn test_ballot_error_message_parsing() { - let error = BallotError::MessageParsing("Parsing failed".into()); - if let BallotError::MessageParsing(msg) = error { - assert_eq!(msg, "Parsing failed"); - } else { - panic!("Expected MessageParsing error"); - } +/// Converts a `ValueMismatch` into an `UpdateStateError`. +/// +/// This conversion is used when there is a value mismatch during state updates, +/// allowing the error to be propagated and handled as an `UpdateStateError`. +impl From> for UpdateStateError { + fn from(error: ValueMismatch) -> Self { + UpdateStateError::ValueMismatch(error) } +} - #[test] - fn test_ballot_error_invalid_state() { - let error = BallotError::InvalidState("Invalid state transition".into()); - if let BallotError::InvalidState(msg) = error { - assert_eq!(msg, "Invalid state transition"); - } else { - panic!("Expected InvalidState error"); - } +/// Converts a `DeserializationError` into an `UpdateStateError`. +/// +/// This conversion is used when a deserialization error occurs during state updates, +/// allowing the error to be handled within the context of state updates. +impl From for UpdateStateError { + fn from(error: DeserializationError) -> Self { + UpdateStateError::DeserializationError(error) } } diff --git a/src/leader.rs b/src/leader.rs new file mode 100644 index 0000000..d565b9a --- /dev/null +++ b/src/leader.rs @@ -0,0 +1,266 @@ +//! Definitions central to BPCon leader election logic. +//! +//! This module defines the traits and structures involved in leader election within the BPCon consensus protocol. +//! The leader election process is crucial in BPCon, ensuring that a single leader is chosen to coordinate the consensus rounds. + +use crate::party::Party; +use crate::value::{Value, ValueSelector}; +use seeded_random::{Random, Seed}; +use std::cmp::Ordering; +use std::hash::{DefaultHasher, Hash, Hasher}; +use thiserror::Error; + +/// A trait that encapsulates the logic for leader election in the BPCon protocol. +/// +/// Implementors of this trait provide the mechanism to elect a leader from the participating parties +/// for a given ballot round. The elected leader is responsible for driving the consensus process +/// during that round. +/// +/// # Type Parameters +/// - `V`: The type of values being proposed and agreed upon in the consensus process, which must implement the `Value` trait. +/// - `VS`: The type of value selector used to choose values during the consensus process, which must implement the `ValueSelector` trait. +pub trait LeaderElector>: Send { + /// Elects a leader for the current ballot. + /// + /// This method returns the ID of the party elected as the leader for the current ballot round. + /// + /// # Parameters + /// - `party`: A reference to the `Party` instance containing information about the current ballot and participating parties. + /// + /// # Returns + /// - `Ok(u64)`: The ID of the elected party. + /// - `Err(Box)`: An error if leader election fails. + fn elect_leader(&self, party: &Party) -> Result>; +} + +/// A default implementation of the `LeaderElector` trait for BPCon. +/// +/// This struct provides a standard mechanism for leader election based on weighted randomization, +/// ensuring deterministic leader selection across multiple rounds. +#[derive(Clone, Debug, Default)] +pub struct DefaultLeaderElector {} + +impl DefaultLeaderElector { + /// Creates a new `DefaultLeaderElector` instance. + pub fn new() -> Self { + Self::default() + } + + /// Computes a seed for randomized leader election. + /// + /// This method generates a seed value based on the party configuration and the current ballot, + /// ensuring that leader election is deterministic but randomized. + /// + /// # Parameters + /// - `party`: A reference to the `Party` instance containing information about the current ballot and participating parties. + /// + /// # Returns + /// A `u64` seed value for use in leader election. + fn compute_seed>(party: &Party) -> u64 { + let mut hasher = DefaultHasher::new(); + + // Hash each field that should contribute to the seed + party.cfg.party_weights.hash(&mut hasher); + party.cfg.threshold.hash(&mut hasher); + party.ballot.hash(&mut hasher); + + // Generate the seed from the hash + hasher.finish() + } + + /// Hashes the seed to a value within a specified range. + /// + /// This method uses the computed seed to generate a value within the range [0, range). + /// The algorithm ensures uniform distribution of the resulting value, which is crucial + /// for fair leader election. + /// + /// # Parameters + /// - `seed`: The seed value used for randomization. + /// - `range`: The upper limit for the random value generation, typically the sum of party weights. + /// + /// # Returns + /// A `u64` value within the specified range. + fn hash_to_range(seed: u64, range: u64) -> u64 { + // Determine the number of bits required to represent the range + let mut k = 64; + while 1u64 << (k - 1) >= range { + k -= 1; + } + + // Use a seeded random generator to produce a value within the desired range + let rng = Random::from_seed(Seed::unsafe_new(seed)); + loop { + let mut raw_res: u64 = rng.gen(); + raw_res >>= 64 - k; + + if raw_res < range { + return raw_res; + } + // If the generated value is not within range, repeat the process + } + } +} + +/// Errors that can occur during leader election using the `DefaultLeaderElector`. +#[derive(Error, Debug)] +pub enum DefaultLeaderElectorError { + /// Error indicating that the sum of party weights is zero, making leader election impossible. + #[error("zero weight sum")] + ZeroWeightSum, +} + +impl> LeaderElector for DefaultLeaderElector { + /// Elects a leader in a weighted randomized manner. + /// + /// This method uses the computed seed and party weights to select a leader. The process is deterministic + /// due to the use of a fixed seed but allows for randomized leader selection based on the weight distribution. + /// + /// # Parameters + /// - `party`: A reference to the `Party` instance containing information about the current ballot and participating parties. + /// + /// # Returns + /// - `Ok(u64)`: The ID of the elected party. + /// - `Err(Box)`: An error if the leader election process fails, such as when the total weight is zero. + fn elect_leader(&self, party: &Party) -> Result> { + let seed = DefaultLeaderElector::compute_seed(party); + + let total_weight: u64 = party.cfg.party_weights.iter().sum(); + if total_weight == 0 { + return Err(DefaultLeaderElectorError::ZeroWeightSum.into()); + } + + // Generate a random number in the range [0, total_weight) + let random_value = DefaultLeaderElector::hash_to_range(seed, total_weight); + + // Use binary search to find the corresponding participant based on the cumulative weight + let mut cumulative_weights = vec![0; party.cfg.party_weights.len()]; + cumulative_weights[0] = party.cfg.party_weights[0]; + + for i in 1..party.cfg.party_weights.len() { + cumulative_weights[i] = cumulative_weights[i - 1] + party.cfg.party_weights[i]; + } + + match cumulative_weights.binary_search_by(|&weight| { + if random_value < weight { + Ordering::Greater + } else { + Ordering::Less + } + }) { + Ok(index) | Err(index) => Ok(index as u64), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::party::tests::{default_config, default_party}; + use rand::Rng; + use std::thread; + use std::time::Duration; + + #[test] + fn test_default_leader_elector_determinism() { + let party = default_party(); + let elector = DefaultLeaderElector::new(); + + let leader1 = elector.elect_leader(&party).unwrap(); + + // Test multiple iterations to ensure the leader remains the same + for i in 2..=10 { + let leader = elector.elect_leader(&party).unwrap(); + assert_eq!( + leader1, leader, + "Leaders should be consistent on repeated calls (iteration {})", + i + ); + } + } + + #[test] + fn test_default_leader_elector_fail_with_zero_weights() { + let mut party = default_party(); + let mut cfg = default_config(); + cfg.party_weights = vec![0, 0, 0]; + party.cfg = cfg; + + let elector = DefaultLeaderElector::new(); + + match elector.elect_leader(&party) { + Err(_) => {} // This is the expected behavior + _ => panic!("Expected DefaultLeaderElectorError::ZeroWeightSum"), + } + } + + fn debug_hash_to_range_new(seed: u64, range: u64) -> u64 { + assert!(range > 1); + + let mut k = 64; + while 1u64 << (k - 1) >= range { + k -= 1; + } + + let rng = Random::from_seed(Seed::unsafe_new(seed)); + + let mut iteration = 1u64; + loop { + let mut raw_res: u64 = rng.gen(); + raw_res >>= 64 - k; + + if raw_res < range { + return raw_res; + } + + iteration += 1; + assert!(iteration <= 50) + } + } + + #[test] + #[ignore] // Ignoring since it takes a while to run + fn test_hash_range_random() { + // Test the uniform distribution + + const N: usize = 37; + const M: i64 = 10000000; + + let mut cnt1: [i64; N] = [0; N]; + + for _ in 0..M { + let mut rng = rand::thread_rng(); + let seed: u64 = rng.random(); + + let res1 = debug_hash_to_range_new(seed, N as u64); + assert!(res1 < N as u64); + + cnt1[res1 as usize] += 1; + } + + println!("1: {:?}", cnt1); + + let mut avg1: i64 = 0; + + for item in cnt1.iter().take(N) { + avg1 += (M / (N as i64) - item).abs(); + } + + avg1 /= N as i64; + + println!("Avg 1: {}", avg1); + } + + #[test] + fn test_rng() { + let rng1 = Random::from_seed(Seed::unsafe_new(123456)); + let rng2 = Random::from_seed(Seed::unsafe_new(123456)); + + println!("{}", rng1.gen::()); + println!("{}", rng2.gen::()); + + thread::sleep(Duration::from_secs(2)); + + println!("{}", rng1.gen::()); + println!("{}", rng2.gen::()); + } +} diff --git a/src/lib.rs b/src/lib.rs index afa03b0..db83c12 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,23 +1,6 @@ -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; - -mod error; +pub mod config; +pub mod error; +pub mod leader; pub mod message; pub mod party; - -/// General trait for value itself. -pub trait Value: Eq + Serialize + for<'a> Deserialize<'a> + Clone {} - -/// Trait for value selector and verificator. -/// Value selection and verification may depend on different conditions for different values. -/// Note that value selection should follow the rules of BPCon: only safe values can be selected. -/// Party can not vote for different values, even in different ballots. -pub trait ValueSelector { - /// Verifies if a value is selected correctly. Accepts 2b messages from parties. - fn verify(&self, v: &V, m: &HashMap>) -> bool; - - /// Select value depending on inner conditions. Accepts 2b messages from parties. - fn select(&self, m: &HashMap>) -> V; - - // TODO: add other fields to update selector state. -} +pub mod value; diff --git a/src/message.rs b/src/message.rs index daa86ab..fdb6302 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,28 +1,36 @@ -//! Definition of the BPCon messages. +//! Definitions central to BPCon messages. +//! +//! This module defines the structures and functionality for messages used in the BPCon consensus protocol. +//! It includes message contents, routing information, and utilities for packing and unpacking messages. -use rkyv::{AlignedVec, Archive, Deserialize, Serialize}; +use crate::error::{DeserializationError, SerializationError}; +use rkyv::{AlignedVec, Archive, Deserialize, Infallible, Serialize}; +use std::collections::HashSet; -/// Message ready for transfer. +/// A message ready for transfer within the BPCon protocol. +#[derive(Debug, Clone)] pub struct MessagePacket { /// Serialized message contents. pub content_bytes: AlignedVec, - /// Routing information. + /// Routing information for the message. pub routing: MessageRouting, } -/// Full routing information for the message. +/// Full routing information for a BPCon message. +/// This struct is used to manage and route messages through the BPCon protocol. +#[derive(PartialEq, Eq, Debug, Copy, Clone)] pub struct MessageRouting { - /// Which participant this message came from. + /// The ID of the participant who sent the message. pub sender: u64, - /// Where this message should be delivered. Can be empty if `is_broadcast` is `true` - pub receivers: Vec, - /// Indicates whether this message shall be broadcast to other participants. - pub is_broadcast: bool, - /// Stores the BPCon message type. + /// The type of BPCon protocol message. pub msg_type: ProtocolMessage, } -/// Representation of message types of the consensus. +/// Enumeration of the different types of protocol messages used in BPCon. +/// +/// These message types represent the various stages of the BPCon consensus protocol, +/// each corresponding to a specific phase in the process. +#[derive(PartialEq, Eq, Debug, Copy, Clone)] pub enum ProtocolMessage { Msg1a, Msg1b, @@ -31,99 +39,208 @@ pub enum ProtocolMessage { Msg2b, } -// Value in messages is stored in serialized format, i.e bytes in order to omit -// strict restriction for `Value` trait to be [de]serializable only with `rkyv`. +impl std::fmt::Display for ProtocolMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ProtocolMessage: {:?}", self) + } +} + +// The following structures represent the contents of different BPCon protocol messages. +// Each message type is serialized before being sent, and deserialized upon receipt. +// Value in messages is stored in serialized format, i.e., bytes, to avoid strict restrictions +// on the `Value` trait being [de]serializable only with `rkyv`. + +/// Contents of a BPCon message of type 1a. #[derive(Archive, Deserialize, Serialize, Debug, Clone)] #[archive(compare(PartialEq), check_bytes)] #[archive_attr(derive(Debug))] pub struct Message1aContent { + /// The ballot number associated with this message. pub ballot: u64, } +/// Contents of a BPCon message of type 1b. #[derive(Archive, Deserialize, Serialize, Debug, Clone)] #[archive(compare(PartialEq), check_bytes)] #[archive_attr(derive(Debug))] pub struct Message1bContent { + /// The ballot number associated with this message. pub ballot: u64, + /// The last ballot number that was voted on, if any. pub last_ballot_voted: Option, + /// The last value that was voted on, serialized as a vector of bytes, if any. pub last_value_voted: Option>, } +/// Contents of a BPCon message of type 2a. #[derive(Archive, Deserialize, Serialize, Debug, Clone)] #[archive(compare(PartialEq), check_bytes)] #[archive_attr(derive(Debug))] pub struct Message2aContent { + /// The ballot number associated with this message. pub ballot: u64, + /// The proposed value, serialized as a vector of bytes. pub value: Vec, } +/// Contents of a BPCon message of type 2av. #[derive(Archive, Deserialize, Serialize, Debug, Clone)] #[archive(compare(PartialEq), check_bytes)] #[archive_attr(derive(Debug))] pub struct Message2avContent { + /// The ballot number associated with this message. pub ballot: u64, + /// The received value, serialized as a vector of bytes. pub received_value: Vec, } +/// Contents of a BPCon message of type 2b. #[derive(Archive, Deserialize, Serialize, Debug, Clone)] #[archive(compare(PartialEq), check_bytes)] #[archive_attr(derive(Debug))] pub struct Message2bContent { + /// The ballot number associated with this message. pub ballot: u64, } -impl Message1aContent { - pub fn get_routing(id: u64) -> MessageRouting { - MessageRouting { - sender: id, - receivers: vec![], - is_broadcast: true, - msg_type: ProtocolMessage::Msg1a, +// Macro to implement packing and unpacking logic for each message content type. +// This logic handles serialization to bytes and deserialization from bytes, as well as setting up routing information. + +macro_rules! impl_packable { + ($type:ty, $msg_type:expr) => { + impl $type { + /// Packs the message content into a `MessagePacket` for transfer. + /// + /// This method serializes the message content and combines it with routing information. + /// + /// # Parameters + /// + /// - `sender`: The ID of the participant sending the message. + /// + /// # Returns + /// + /// A `MessagePacket` containing the serialized message and routing information, or a `SerializationError` if packing fails. + pub fn pack(&self, sender: u64) -> Result { + let content_bytes = rkyv::to_bytes::<_, 256>(self) + .map_err(|err| SerializationError::Message(err.to_string()))?; + Ok(MessagePacket { + content_bytes, + routing: Self::route(sender), + }) + } + + /// Unpacks a `MessagePacket` to extract the original message content. + /// + /// This method deserializes the message content from the byte representation stored in the packet. + /// + /// # Parameters + /// + /// - `msg`: A reference to the `MessagePacket` to be unpacked. + /// + /// # Returns + /// + /// The deserialized message content, or a `DeserializationError` if unpacking fails. + pub fn unpack(msg: &MessagePacket) -> Result { + let archived = rkyv::check_archived_root::(msg.content_bytes.as_slice()) + .map_err(|err| DeserializationError::Message(err.to_string()))?; + archived + .deserialize(&mut Infallible) + .map_err(|err| DeserializationError::Message(err.to_string())) + } + + /// Creates routing information for the message. + /// + /// This method sets up the routing details, including the sender and the message type. + /// + /// # Parameters + /// + /// - `sender`: The ID of the participant sending the message. + /// + /// # Returns + /// + /// A `MessageRouting` instance containing the routing information. + fn route(sender: u64) -> MessageRouting { + MessageRouting { + sender, + msg_type: $msg_type, + } + } } - } + }; } -impl Message1bContent { - pub fn get_routing(id: u64) -> MessageRouting { - MessageRouting { - sender: id, - receivers: vec![], - is_broadcast: true, - msg_type: ProtocolMessage::Msg1b, - } - } +// Implement the packing and unpacking functionality for each message content type. +impl_packable!(Message1aContent, ProtocolMessage::Msg1a); +impl_packable!(Message1bContent, ProtocolMessage::Msg1b); +impl_packable!(Message2aContent, ProtocolMessage::Msg2a); +impl_packable!(Message2avContent, ProtocolMessage::Msg2av); +impl_packable!(Message2bContent, ProtocolMessage::Msg2b); + +/// A struct to keep track of senders and the cumulative weight of their messages. +/// +/// `MessageRoundState` is used to monitor the participants who have sent messages and to accumulate +/// the total weight of these messages. This helps in determining when a quorum has been reached. +#[derive(PartialEq, Eq, Clone, Debug, Default)] +pub struct MessageRoundState { + senders: HashSet, + weight: u128, } -impl Message2aContent { - pub fn get_routing(id: u64) -> MessageRouting { - MessageRouting { - sender: id, - receivers: vec![], - is_broadcast: true, - msg_type: ProtocolMessage::Msg2a, - } +impl MessageRoundState { + /// Creates a new, empty `MessageRoundState`. + /// + /// This method initializes an empty state with no senders and zero weight. + /// + /// # Returns + /// + /// A new `MessageRoundState` instance. + pub fn new() -> Self { + Self::default() } -} -impl Message2avContent { - pub fn get_routing(id: u64) -> MessageRouting { - MessageRouting { - sender: id, - receivers: vec![], - is_broadcast: true, - msg_type: ProtocolMessage::Msg2av, - } + /// Returns the cumulative weight of all messages received in this round. + /// + /// # Returns + /// + /// The total weight of all messages that have been added to this state. + pub fn get_weight(&self) -> u128 { + self.weight } -} -impl Message2bContent { - pub fn get_routing(id: u64) -> MessageRouting { - MessageRouting { - sender: id, - receivers: vec![], - is_broadcast: true, - msg_type: ProtocolMessage::Msg2b, - } + /// Adds a sender and their corresponding weight to the state. + /// + /// This method updates the state to include the specified sender and increments the cumulative + /// weight by the specified amount. + /// + /// # Parameters + /// + /// - `sender`: The ID of the participant sending the message. + /// - `weight`: The weight associated with this sender's message. + pub fn add_sender(&mut self, sender: u64, weight: u128) { + self.senders.insert(sender); + self.weight += weight; + } + + /// Checks if a sender has already sent a message in this round. + /// + /// # Parameters + /// + /// - `sender`: A reference to the ID of the participant. + /// + /// # Returns + /// + /// `true` if the sender has already sent a message, `false` otherwise. + pub fn contains_sender(&self, sender: &u64) -> bool { + self.senders.contains(sender) + } + + /// Resets the state for a new round. + /// + /// This method clears all recorded senders and resets the cumulative weight to zero, + /// preparing the state for a new round of message collection. + pub fn reset(&mut self) { + self.senders.clear(); + self.weight = 0; } } diff --git a/src/party.rs b/src/party.rs index c32ddd1..8282dc6 100644 --- a/src/party.rs +++ b/src/party.rs @@ -1,61 +1,38 @@ -//! Definition of the BPCon participant structure. - -use crate::error::BallotError; +//! Definitions central to BPCon participant. +//! +//! This module contains the implementation of a `Party` in the BPCon consensus protocol. +//! The `Party` manages the execution of the ballot, handles incoming messages, and coordinates with other participants +//! to reach a consensus. It uses various components such as leader election and value selection to perform its duties. + +use crate::config::BPConConfig; +use crate::error::FollowEventError::FailedToSendMessage; +use crate::error::LaunchBallotError::{ + EventChannelClosed, FailedToSendEvent, LeaderElectionError, MessageChannelClosed, +}; +use crate::error::UpdateStateError::ValueVerificationFailed; +use crate::error::{ + BallotNumberMismatch, DeserializationError, FollowEventError, LaunchBallotError, + LeaderMismatch, PartyStatusMismatch, SerializationError, UpdateStateError, ValueMismatch, +}; +use crate::leader::LeaderElector; use crate::message::{ Message1aContent, Message1bContent, Message2aContent, Message2avContent, Message2bContent, - MessagePacket, MessageRouting, ProtocolMessage, + MessagePacket, MessageRoundState, ProtocolMessage, }; -use crate::{Value, ValueSelector}; -use rkyv::{AlignedVec, Deserialize, Infallible}; -use seeded_random::{Random, Seed}; -use std::cmp::{Ordering, PartialEq}; -use std::collections::hash_map::DefaultHasher; +use crate::value::{Value, ValueSelector}; +use log::{debug, warn}; +use std::cmp::PartialEq; use std::collections::hash_map::Entry::Vacant; -use std::collections::{HashMap, HashSet}; -use std::hash::{Hash, Hasher}; +use std::collections::HashMap; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::time::{self, Duration}; - -/// BPCon configuration. Includes ballot time bounds and other stuff. -#[derive(PartialEq, Eq, Debug, Clone)] -pub struct BPConConfig { - /// Parties weights: `party_weights[i]` corresponds to the i-th party weight - pub party_weights: Vec, - - /// Threshold weight to define BFT quorum: should be > 2/3 of total weight - pub threshold: u128, - - /// Timeout before ballot is launched. - /// Differs from `launch1a_timeout` having another status and not listening - /// to external events and messages. - pub launch_timeout: Duration, - - /// Timeout before 1a stage is launched. - pub launch1a_timeout: Duration, - - /// Timeout before 1b stage is launched. - pub launch1b_timeout: Duration, - - /// Timeout before 2a stage is launched. - pub launch2a_timeout: Duration, - - /// Timeout before 2av stage is launched. - pub launch2av_timeout: Duration, - - /// Timeout before 2b stage is launched. - pub launch2b_timeout: Duration, +use tokio::time::sleep; - /// Timeout before finalization stage is launched. - pub finalize_timeout: Duration, - - /// Timeout for a graceful period to help parties with latency. - pub grace_period: Duration, -} - -/// Party status defines the statuses of the ballot for the particular participant -/// depending on local calculations. -#[derive(PartialEq, Debug)] -pub(crate) enum PartyStatus { +/// Represents the status of a `Party` in the BPCon consensus protocol. +/// +/// The status indicates the current phase or outcome of the ballot execution for this party. +/// It transitions through various states as the party progresses through the protocol. +#[derive(PartialEq, Eq, Debug, Copy, Clone)] +pub enum PartyStatus { None, Launched, Passed1a, @@ -67,9 +44,17 @@ pub(crate) enum PartyStatus { Failed, } -/// Party events is used for the ballot flow control. -#[derive(PartialEq, Debug)] -pub(crate) enum PartyEvent { +impl std::fmt::Display for PartyStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "PartyStatus: {:?}", self) + } +} + +/// Represents the events that control the flow of the ballot process in a `Party`. +/// +/// These events trigger transitions between different phases of the protocol. +#[derive(PartialEq, Eq, Debug, Copy, Clone)] +pub enum PartyEvent { Launch1a, Launch1b, Launch2a, @@ -78,194 +63,96 @@ pub(crate) enum PartyEvent { Finalize, } -/// A struct to keep track of senders and the cumulative weight of their messages. -#[derive(PartialEq, Debug)] -struct MessageRoundState { - senders: HashSet, - weight: u128, -} - -impl MessageRoundState { - /// Creates a new instance of `MessageRoundState`. - fn new() -> Self { - Self { - senders: HashSet::new(), - weight: 0, - } - } - - /// Adds a sender and their corresponding weight. - fn add_sender(&mut self, sender: u64, weight: u128) { - self.senders.insert(sender); - self.weight += weight; - } - - /// Checks if the sender has already sent a message. - fn contains_sender(&self, sender: &u64) -> bool { - self.senders.contains(sender) - } - - /// Resets the state. - fn reset(&mut self) { - self.senders.clear(); - self.weight = 0; +impl std::fmt::Display for PartyEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "PartyEvent: {:?}", self) } } -/// Trait incorporating logic for leader election. -pub trait LeaderElector>: Send { - /// Get leader for current ballot. - /// Returns id of the elected party or error. - fn get_leader(&self, party: &Party) -> Result; -} - -pub struct DefaultLeaderElector {} - -impl DefaultLeaderElector { - /// Compute seed for randomized leader election. - fn compute_seed>(party: &Party) -> u64 { - let mut hasher = DefaultHasher::new(); - - // Hash each field that should contribute to the seed - party.cfg.party_weights.hash(&mut hasher); - party.cfg.threshold.hash(&mut hasher); - party.ballot.hash(&mut hasher); - - // You can add more fields as needed - - // Generate the seed from the hash - hasher.finish() - } - - /// Hash the seed to a value within a given range. - fn hash_to_range(seed: u64, range: u64) -> u64 { - // Select the `k` suck that value 2^k >= `range` and 2^k is the smallest. - let mut k = 64; - while 1u64 << (k - 1) >= range { - k -= 1; - } - - // The following algorithm selects a random u64 value using `ChaCha12Rng` - // and reduces the result to the k-bits such that 2^k >= `range` the closes power of to the `range`. - // After we check if the result lies in [0..`range`) or [`range`..2^k). - // In the first case result is an acceptable value generated uniformly. - // In the second case we repeat the process again with the incremented iterations counter. - // Ref: Practical Cryptography 1st Edition by Niels Ferguson, Bruce Schneier, paragraph 10.8 - let rng = Random::from_seed(Seed::unsafe_new(seed)); - loop { - let mut raw_res: u64 = rng.gen(); - raw_res >>= 64 - k; - - if raw_res < range { - return raw_res; - } - // Executing this loop does not require a large number of iterations. - // Check tests for more info - } - } -} - -impl> LeaderElector for DefaultLeaderElector { - /// Compute leader in a weighed randomized manner. - /// Uses seed from the config, making it deterministic. - fn get_leader(&self, party: &Party) -> Result { - let seed = DefaultLeaderElector::compute_seed(party); - - let total_weight: u64 = party.cfg.party_weights.iter().sum(); - if total_weight == 0 { - return Err(BallotError::LeaderElection("Zero weight sum".into())); - } - - // Generate a random number in the range [0, total_weight) - let random_value = DefaultLeaderElector::hash_to_range(seed, total_weight); - - // Use binary search to find the corresponding participant - let mut cumulative_weights = vec![0; party.cfg.party_weights.len()]; - cumulative_weights[0] = party.cfg.party_weights[0]; - - for i in 1..party.cfg.party_weights.len() { - cumulative_weights[i] = cumulative_weights[i - 1] + party.cfg.party_weights[i]; - } - - match cumulative_weights.binary_search_by(|&weight| { - if random_value < weight { - Ordering::Greater - } else { - Ordering::Less - } - }) { - Ok(index) | Err(index) => Ok(index as u64), - } - } -} -/// Party of the BPCon protocol that executes ballot. +/// A participant in the BPCon protocol responsible for executing ballots. /// -/// The communication between party and external -/// system is done via `in_receiver` and `out_sender` channels. External system should take -/// care about authentication while receiving incoming messages and then push them to the -/// corresponding `Sender` to the `in_receiver`. Same, it should tke care about listening of new -/// messages in the corresponding `Receiver` to the `out_sender` and submitting them to the -/// corresponding party based on information in `MessageRouting`. +/// A `Party` manages the execution of ballots by communicating with other parties, processing incoming messages, +/// and following the protocol's steps. It uses an internal state machine to track its progress through the ballot process. /// -/// After finishing of the ballot protocol, party will place the selected value to the -/// `value_sender` or `BallotError` if ballot failed. +/// # Communication +/// - `msg_in_receiver`: Receives incoming messages from other parties. +/// - `msg_out_sender`: Sends outgoing messages to other parties. +/// - `event_receiver`: Receives events that drive the ballot process. +/// - `event_sender`: Sends events to trigger actions in the ballot process. +/// +/// The `Party` operates within a BPCon configuration and relies on a `ValueSelector` to choose values during the consensus process. +/// A `LeaderElector` is used to determine the leader for each ballot. pub struct Party> { - /// This party's identifier. + /// The identifier of this party. pub id: u64, - /// Communication queues. + /// Queue for receiving incoming messages. msg_in_receiver: UnboundedReceiver, + /// Queue for sending outgoing messages. msg_out_sender: UnboundedSender, - /// Query to receive and send events that run ballot protocol + /// Queue for receiving events that control the ballot process. event_receiver: UnboundedReceiver, + /// Queue for sending events that control the ballot process. event_sender: UnboundedSender, - /// BPCon config (e.g. ballot time bounds, parties weights, etc.). - cfg: BPConConfig, + /// BPCon configuration settings, including timeouts and party weights. + pub(crate) cfg: BPConConfig, - /// Main functional for value selection. + /// Component responsible for selecting values during the consensus process. value_selector: VS, - /// Main functional for leader election. + /// Component responsible for electing a leader for each ballot. elector: Box>, - /// Status of the ballot execution + /// The current status of the ballot execution for this party. status: PartyStatus, - /// Current ballot number - ballot: u64, + /// The current ballot number. + pub(crate) ballot: u64, - /// Current ballot leader + /// The leader for the current ballot. leader: u64, - /// Last ballot where party submitted 2b message + /// The last ballot number where this party submitted a 2b message. last_ballot_voted: Option, - /// Last value for which party submitted 2b message + /// The last value for which this party submitted a 2b message. last_value_voted: Option, - /// Local round fields - - /// 1b round state - /// - parties_voted_before: HashMap>, // id <-> value + // Local round fields + /// The state of 1b round, tracking which parties have voted and their corresponding values. + parties_voted_before: HashMap>, + /// The cumulative weight of 1b messages received. messages_1b_weight: u128, - /// 2a round state - /// + /// The state of 2a round, storing the value proposed by this party. value_2a: Option, - /// 2av round state - /// + /// The state of 2av round, tracking which parties have confirmed the 2a value. messages_2av_state: MessageRoundState, - /// 2b round state - /// + /// The state of 2b round, tracking which parties have sent 2b messages. messages_2b_state: MessageRoundState, } impl> Party { + /// Creates a new `Party` instance. + /// + /// This constructor sets up the party with the given ID, BPCon configuration, value selector, and leader elector. + /// It also initializes communication channels for receiving and sending messages and events. + /// + /// # Parameters + /// - `id`: The unique identifier for this party. + /// - `cfg`: The BPCon configuration settings. + /// - `value_selector`: The component responsible for selecting values during the consensus process. + /// - `elector`: The component responsible for electing the leader for each ballot. + /// + /// # Returns + /// - A tuple containing: + /// - The new `Party` instance. + /// - The `UnboundedReceiver` for outgoing messages. + /// - The `UnboundedSender` for incoming messages. pub fn new( id: u64, cfg: BPConConfig, @@ -306,18 +193,32 @@ impl> Party { ) } + /// Returns the current ballot number. pub fn ballot(&self) -> u64 { self.ballot } + /// Checks if the ballot process has been launched. + /// + /// # Returns + /// - `true` if the ballot is currently active; `false` otherwise. pub fn is_launched(&self) -> bool { !self.is_stopped() } + /// Checks if the ballot process has been stopped. + /// + /// # Returns + /// - `true` if the ballot has finished or failed; `false` otherwise. pub fn is_stopped(&self) -> bool { self.status == PartyStatus::Finished || self.status == PartyStatus::Failed } + /// Retrieves the selected value if the ballot process has finished successfully. + /// + /// # Returns + /// - `Some(V)` if the ballot reached a consensus and the value was selected. + /// - `None` if the ballot did not reach consensus or is still ongoing. pub fn get_value_selected(&self) -> Option { // Only `Finished` status means reached BFT agreement if self.status == PartyStatus::Finished { @@ -327,22 +228,38 @@ impl> Party { None } + /// Selects a value based on the current state of the party. + /// + /// This method delegates to the `ValueSelector` to determine the value that should be selected + /// based on the votes received in the 1b round. + /// + /// # Returns + /// - The value selected by the `ValueSelector`. fn get_value(&self) -> V { self.value_selector.select(&self.parties_voted_before) } - pub async fn launch_ballot(&mut self) -> Result, BallotError> { + /// Launches the ballot process. + /// + /// This method initiates the ballot process, advancing through the different phases of the protocol + /// by sending and receiving events and messages. It handles timeouts for each phase and processes + /// incoming messages to update the party's state. + /// + /// # Returns + /// - `Ok(Some(V))`: The selected value if the ballot reaches consensus. + /// - `Ok(None)`: If the ballot process is terminated without reaching consensus. + /// - `Err(LaunchBallotError)`: If an error occurs during the ballot process. + pub async fn launch_ballot(&mut self) -> Result, LaunchBallotError> { self.prepare_next_ballot()?; - time::sleep(self.cfg.launch_timeout).await; - self.status = PartyStatus::Launched; + sleep(self.cfg.launch_timeout).await; - let launch1a_timer = time::sleep(self.cfg.launch1a_timeout); - let launch1b_timer = time::sleep(self.cfg.launch1b_timeout); - let launch2a_timer = time::sleep(self.cfg.launch2a_timeout); - let launch2av_timer = time::sleep(self.cfg.launch2av_timeout); - let launch2b_timer = time::sleep(self.cfg.launch2b_timeout); - let finalize_timer = time::sleep(self.cfg.finalize_timeout); + let launch1a_timer = sleep(self.cfg.launch1a_timeout); + let launch1b_timer = sleep(self.cfg.launch1b_timeout); + let launch2a_timer = sleep(self.cfg.launch2a_timeout); + let launch2av_timer = sleep(self.cfg.launch2av_timeout); + let launch2b_timer = sleep(self.cfg.launch2b_timeout); + let finalize_timer = sleep(self.cfg.finalize_timeout); tokio::pin!( launch1a_timer, @@ -363,69 +280,71 @@ impl> Party { while self.is_launched() { tokio::select! { _ = &mut launch1a_timer, if !launch1a_fired => { - self.event_sender.send(PartyEvent::Launch1a).map_err(|_| { + self.event_sender.send(PartyEvent::Launch1a).map_err(|err| { self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch1a event".into()) + FailedToSendEvent(PartyEvent::Launch1a, err.to_string()) })?; launch1a_fired = true; }, _ = &mut launch1b_timer, if !launch1b_fired => { - self.event_sender.send(PartyEvent::Launch1b).map_err(|_| { + self.event_sender.send(PartyEvent::Launch1b).map_err(|err| { self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch1b event".into()) + FailedToSendEvent(PartyEvent::Launch1b, err.to_string()) })?; launch1b_fired = true; }, _ = &mut launch2a_timer, if !launch2a_fired => { - self.event_sender.send(PartyEvent::Launch2a).map_err(|_| { + self.event_sender.send(PartyEvent::Launch2a).map_err(|err| { self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch2a event".into()) + FailedToSendEvent(PartyEvent::Launch2a, err.to_string()) })?; launch2a_fired = true; }, _ = &mut launch2av_timer, if !launch2av_fired => { - self.event_sender.send(PartyEvent::Launch2av).map_err(|_| { + self.event_sender.send(PartyEvent::Launch2av).map_err(|err| { self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch2av event".into()) + FailedToSendEvent(PartyEvent::Launch2av, err.to_string()) })?; launch2av_fired = true; }, _ = &mut launch2b_timer, if !launch2b_fired => { - self.event_sender.send(PartyEvent::Launch2b).map_err(|_| { + self.event_sender.send(PartyEvent::Launch2b).map_err(|err| { self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Launch2b event".into()) + FailedToSendEvent(PartyEvent::Launch2b, err.to_string()) })?; launch2b_fired = true; }, _ = &mut finalize_timer, if !finalize_fired => { - self.event_sender.send(PartyEvent::Finalize).map_err(|_| { + self.event_sender.send(PartyEvent::Finalize).map_err(|err| { self.status = PartyStatus::Failed; - BallotError::Communication("Failed to send Finalize event".into()) + FailedToSendEvent(PartyEvent::Finalize, err.to_string()) })?; finalize_fired = true; }, - msg_wire = self.msg_in_receiver.recv() => { - tokio::time::sleep(self.cfg.grace_period).await; - if let Some(msg_wire) = msg_wire { - if let Err(err) = self.update_state(msg_wire.content_bytes, msg_wire.routing) { - self.status = PartyStatus::Failed; - return Err(err); + msg = self.msg_in_receiver.recv() => { + sleep(self.cfg.grace_period).await; + if let Some(msg) = msg { + debug!("Party {} received {} from party {}", self.id, msg.routing.msg_type, msg.routing.sender); + if let Err(err) = self.update_state(&msg) { + // Shouldn't fail the party, since invalid message + // may be sent by anyone. + warn!("Failed to update state with {}, got error: {err}", msg.routing.msg_type) } }else if self.msg_in_receiver.is_closed(){ self.status = PartyStatus::Failed; - return Err(BallotError::Communication("msg-in channel closed".into())); + return Err(MessageChannelClosed) } }, event = self.event_receiver.recv() => { - tokio::time::sleep(self.cfg.grace_period).await; + sleep(self.cfg.grace_period).await; if let Some(event) = event { if let Err(err) = self.follow_event(event) { self.status = PartyStatus::Failed; - return Err(err); + return Err(LaunchBallotError::FollowEventError(event, err)); } }else if self.event_receiver.is_closed(){ self.status = PartyStatus::Failed; - return Err(BallotError::Communication("event receiver channel closed".into())); + return Err(EventChannelClosed) } }, } @@ -434,13 +353,29 @@ impl> Party { Ok(self.get_value_selected()) } - /// Prepare state before running a ballot. - fn prepare_next_ballot(&mut self) -> Result<(), BallotError> { - self.status = PartyStatus::None; + /// Prepares the party's state for the next ballot. + /// + /// This method resets the party's state, increments the ballot number, and elects a new leader. + /// + /// # Returns + /// - `Ok(())`: If the preparation is successful. + /// - `Err(LaunchBallotError)`: If an error occurs during leader election. + fn prepare_next_ballot(&mut self) -> Result<(), LaunchBallotError> { + self.reset_state(); self.ballot += 1; - self.leader = self.elector.get_leader(self)?; + self.status = PartyStatus::Launched; + self.leader = self + .elector + .elect_leader(self) + .map_err(|err| LeaderElectionError(err.to_string()))?; + + Ok(()) + } - // Clean state + /// Resets the party's state for a new round of ballot execution. + /// + /// This method clears the state associated with previous rounds and prepares the party for the next ballot. + fn reset_state(&mut self) { self.parties_voted_before = HashMap::new(); self.messages_1b_weight = 0; self.value_2a = None; @@ -450,77 +385,87 @@ impl> Party { // Cleaning channels while self.event_receiver.try_recv().is_ok() {} while self.msg_in_receiver.try_recv().is_ok() {} - - self.status = PartyStatus::Launched; - Ok(()) } - /// Update party's state based on message type. - fn update_state(&mut self, m: AlignedVec, routing: MessageRouting) -> Result<(), BallotError> { + /// Updates the party's state based on an incoming message. + /// + /// This method processes a message according to its type and updates the party's internal state accordingly. + /// It performs validation checks to ensure that the message is consistent with the current ballot and protocol rules. + /// + /// # Parameters + /// - `msg`: The incoming `MessagePacket` to be processed. + /// + /// # Returns + /// - `Ok(())`: If the state is successfully updated. + /// - `Err(UpdateStateError)`: If an error occurs during the update, such as a mismatch in the ballot number or leader. + fn update_state(&mut self, msg: &MessagePacket) -> Result<(), UpdateStateError> { + let routing = msg.routing; + match routing.msg_type { ProtocolMessage::Msg1a => { if self.status != PartyStatus::Launched { - return Err(BallotError::InvalidState( - "Received Msg1a message, while party status is not ".into(), - )); + return Err(PartyStatusMismatch { + party_status: self.status, + needed_status: PartyStatus::Launched, + } + .into()); } - let archived = - rkyv::check_archived_root::(&m[..]).map_err(|err| { - BallotError::MessageParsing(format!("Validation error: {:?}", err)) - })?; - let msg: Message1aContent = - archived.deserialize(&mut Infallible).map_err(|err| { - BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) - })?; + let msg = Message1aContent::unpack(msg)?; if msg.ballot != self.ballot { - return Err(BallotError::InvalidState( - "Ballot number mismatch in Msg1a".into(), - )); + return Err(BallotNumberMismatch { + party_ballot_number: self.ballot, + message_ballot_number: msg.ballot, + } + .into()); } if routing.sender != self.leader { - return Err(BallotError::InvalidState("Invalid leader in Msg1a".into())); + return Err(LeaderMismatch { + party_leader: self.leader, + message_sender: routing.sender, + } + .into()); } self.status = PartyStatus::Passed1a; } ProtocolMessage::Msg1b => { if self.status != PartyStatus::Passed1a { - return Err(BallotError::InvalidState( - "Received Msg1b message, while party status is not ".into(), - )); + return Err(PartyStatusMismatch { + party_status: self.status, + needed_status: PartyStatus::Passed1a, + } + .into()); } - let archived = - rkyv::check_archived_root::(&m[..]).map_err(|err| { - BallotError::MessageParsing(format!("Validation error: {:?}", err)) - })?; - let msg: Message1bContent = - archived.deserialize(&mut Infallible).map_err(|err| { - BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) - })?; + let msg = Message1bContent::unpack(msg)?; if msg.ballot != self.ballot { - return Err(BallotError::InvalidState( - "Ballot number mismatch in Msg1b".into(), - )); + return Err(BallotNumberMismatch { + party_ballot_number: self.ballot, + message_ballot_number: msg.ballot, + } + .into()); } if let Some(last_ballot_voted) = msg.last_ballot_voted { if last_ballot_voted >= self.ballot { - return Err(BallotError::InvalidState( - "Received outdated 1b message".into(), - )); + return Err(BallotNumberMismatch { + party_ballot_number: self.ballot, + message_ballot_number: msg.ballot, + } + .into()); } } if let Vacant(e) = self.parties_voted_before.entry(routing.sender) { let value: Option = match msg.last_value_voted { - Some(ref data) => Some(bincode::deserialize(data).map_err(|err| { - BallotError::ValueParsing(format!("Deserialization error: {:?}", err)) - })?), + Some(ref data) => Some( + bincode::deserialize(data) + .map_err(|err| DeserializationError::Value(err.to_string()))?, + ), None => None, }; @@ -536,33 +481,33 @@ impl> Party { } ProtocolMessage::Msg2a => { if self.status != PartyStatus::Passed1b { - return Err(BallotError::InvalidState( - "Received Msg2a message, while party status is not ".into(), - )); + return Err(PartyStatusMismatch { + party_status: self.status, + needed_status: PartyStatus::Passed1b, + } + .into()); } - let archived = - rkyv::check_archived_root::(&m[..]).map_err(|err| { - BallotError::MessageParsing(format!("Validation error: {:?}", err)) - })?; - let msg: Message2aContent = - archived.deserialize(&mut Infallible).map_err(|err| { - BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) - })?; + let msg = Message2aContent::unpack(msg)?; if msg.ballot != self.ballot { - return Err(BallotError::InvalidState( - "Ballot number mismatch in Msg2a".into(), - )); + return Err(BallotNumberMismatch { + party_ballot_number: self.ballot, + message_ballot_number: msg.ballot, + } + .into()); } if routing.sender != self.leader { - return Err(BallotError::InvalidState("Invalid leader in Msg2a".into())); + return Err(LeaderMismatch { + party_leader: self.leader, + message_sender: routing.sender, + } + .into()); } - let value_received = bincode::deserialize(&msg.value[..]).map_err(|err| { - BallotError::ValueParsing(format!("Failed to parse value in Msg2a: {:?}", err)) - })?; + let value_received = bincode::deserialize(&msg.value[..]) + .map_err(|err| DeserializationError::Value(err.to_string()))?; if self .value_selector @@ -571,44 +516,36 @@ impl> Party { self.status = PartyStatus::Passed2a; self.value_2a = Some(value_received); } else { - return Err(BallotError::InvalidState( - "Failed to verify value in Msg2a".into(), - )); + return Err(ValueVerificationFailed); } } ProtocolMessage::Msg2av => { if self.status != PartyStatus::Passed2a { - return Err(BallotError::InvalidState( - "Received Msg2av message, while party status is not ".into(), - )); + return Err(PartyStatusMismatch { + party_status: self.status, + needed_status: PartyStatus::Passed2a, + } + .into()); } - let archived = - rkyv::check_archived_root::(&m[..]).map_err(|err| { - BallotError::MessageParsing(format!("Validation error: {:?}", err)) - })?; - let msg: Message2avContent = - archived.deserialize(&mut Infallible).map_err(|err| { - BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) - })?; + let msg = Message2avContent::unpack(msg)?; if msg.ballot != self.ballot { - return Err(BallotError::InvalidState( - "Ballot number mismatch in Msg2av".into(), - )); + return Err(BallotNumberMismatch { + party_ballot_number: self.ballot, + message_ballot_number: msg.ballot, + } + .into()); } - let value_received: V = - bincode::deserialize(&msg.received_value[..]).map_err(|err| { - BallotError::ValueParsing(format!( - "Failed to parse value in Msg2av: {:?}", - err - )) - })?; + let value_received: V = bincode::deserialize(&msg.received_value[..]) + .map_err(|err| DeserializationError::Value(err.to_string()))?; if value_received != self.value_2a.clone().unwrap() { - return Err(BallotError::InvalidState( - "Received different value in Msg2av".into(), - )); + return Err(ValueMismatch { + party_value: self.value_2a.clone().unwrap(), + message_value: value_received.clone(), + } + .into()); } if !self.messages_2av_state.contains_sender(&routing.sender) { @@ -617,31 +554,28 @@ impl> Party { self.cfg.party_weights[routing.sender as usize] as u128, ); - if self.messages_2av_state.weight > self.cfg.threshold { + if self.messages_2av_state.get_weight() > self.cfg.threshold { self.status = PartyStatus::Passed2av; } } } ProtocolMessage::Msg2b => { if self.status != PartyStatus::Passed2av { - return Err(BallotError::InvalidState( - "Received Msg2b message, while party status is not ".into(), - )); + return Err(PartyStatusMismatch { + party_status: self.status, + needed_status: PartyStatus::Passed2av, + } + .into()); } - let archived = - rkyv::check_archived_root::(&m[..]).map_err(|err| { - BallotError::MessageParsing(format!("Validation error: {:?}", err)) - })?; - let msg: Message2bContent = - archived.deserialize(&mut Infallible).map_err(|err| { - BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) - })?; + let msg = Message2bContent::unpack(msg)?; if msg.ballot != self.ballot { - return Err(BallotError::InvalidState( - "Ballot number mismatch in Msg2b".into(), - )); + return Err(BallotNumberMismatch { + party_ballot_number: self.ballot, + message_ballot_number: msg.ballot, + } + .into()); } if self.messages_2av_state.contains_sender(&routing.sender) @@ -652,7 +586,7 @@ impl> Party { self.cfg.party_weights[routing.sender as usize] as u128, ); - if self.messages_2b_state.weight > self.cfg.threshold { + if self.messages_2b_state.get_weight() > self.cfg.threshold { self.status = PartyStatus::Passed2b; } } @@ -662,126 +596,143 @@ impl> Party { } /// Executes ballot actions according to the received event. - fn follow_event(&mut self, event: PartyEvent) -> Result<(), BallotError> { + /// + /// This method processes an event and triggers the corresponding action in the ballot process, + /// such as launching a new phase or finalizing the ballot. + /// + /// # Parameters + /// - `event`: The `PartyEvent` to process. + /// + /// # Returns + /// - `Ok(())`: If the event is successfully processed. + /// - `Err(FollowEventError)`: If an error occurs while processing the event. + fn follow_event(&mut self, event: PartyEvent) -> Result<(), FollowEventError> { match event { PartyEvent::Launch1a => { if self.status != PartyStatus::Launched { - return Err(BallotError::InvalidState( - "Cannot launch 1a, incorrect state".into(), - )); + return Err(PartyStatusMismatch { + party_status: self.status, + needed_status: PartyStatus::Launched, + } + .into()); } + if self.leader == self.id { + let content = &Message1aContent { + ballot: self.ballot, + }; + let msg = content.pack(self.id)?; + self.msg_out_sender - .send(MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&Message1aContent { - ballot: self.ballot, - }) - .map_err(|_| { - BallotError::MessageParsing("Failed to serialize Msg1a".into()) - })?, - routing: Message1aContent::get_routing(self.id), - }) - .map_err(|_| BallotError::Communication("Failed to send Msg1a".into()))?; + .send(msg) + .map_err(|err| FailedToSendMessage(err.to_string()))?; + self.status = PartyStatus::Passed1a; } } PartyEvent::Launch1b => { if self.status != PartyStatus::Passed1a { - return Err(BallotError::InvalidState( - "Cannot launch 1b, incorrect state".into(), - )); + return Err(PartyStatusMismatch { + party_status: self.status, + needed_status: PartyStatus::Passed1a, + } + .into()); } - self.msg_out_sender - .send(MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&Message1bContent { - ballot: self.ballot, - last_ballot_voted: self.last_ballot_voted, - last_value_voted: self - .last_value_voted - .clone() - .map(|inner_data| { - bincode::serialize(&inner_data).map_err(|_| { - BallotError::ValueParsing( - "Failed to serialize value".into(), - ) - }) - }) - .transpose()?, - }) - .map_err(|_| { - BallotError::MessageParsing("Failed to serialize Msg1b".into()) - })?, - routing: Message1bContent::get_routing(self.id), + + let last_value_voted = self + .last_value_voted + .clone() + .map(|inner_data| { + bincode::serialize(&inner_data) + .map_err(|err| SerializationError::Value(err.to_string())) }) - .map_err(|_| BallotError::Communication("Failed to send Msg1b".into()))?; + .transpose()?; + + let content = &Message1bContent { + ballot: self.ballot, + last_ballot_voted: self.last_ballot_voted, + last_value_voted, + }; + let msg = content.pack(self.id)?; + + self.msg_out_sender + .send(msg) + .map_err(|err| FailedToSendMessage(err.to_string()))?; } PartyEvent::Launch2a => { if self.status != PartyStatus::Passed1b { - return Err(BallotError::InvalidState( - "Cannot launch 2a, incorrect state".into(), - )); + return Err(PartyStatusMismatch { + party_status: self.status, + needed_status: PartyStatus::Passed1b, + } + .into()); } if self.leader == self.id { + let value = bincode::serialize(&self.get_value()) + .map_err(|err| SerializationError::Value(err.to_string()))?; + + let content = &Message2aContent { + ballot: self.ballot, + value, + }; + let msg = content.pack(self.id)?; + self.msg_out_sender - .send(MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&Message2aContent { - ballot: self.ballot, - value: bincode::serialize(&self.get_value()).map_err(|_| { - BallotError::ValueParsing("Failed to serialize value".into()) - })?, - }) - .map_err(|_| { - BallotError::MessageParsing("Failed to serialize Msg2a".into()) - })?, - routing: Message2aContent::get_routing(self.id), - }) - .map_err(|_| BallotError::Communication("Failed to send Msg2a".into()))?; + .send(msg) + .map_err(|err| FailedToSendMessage(err.to_string()))?; + + self.value_2a = Some(self.get_value()); + self.status = PartyStatus::Passed2a; } } PartyEvent::Launch2av => { if self.status != PartyStatus::Passed2a { - return Err(BallotError::InvalidState( - "Cannot launch 2av, incorrect state".into(), - )); + return Err(PartyStatusMismatch { + party_status: self.status, + needed_status: PartyStatus::Passed2a, + } + .into()); } + + let received_value = bincode::serialize(&self.value_2a.clone().unwrap()) + .map_err(|err| SerializationError::Value(err.to_string()))?; + + let content = &Message2avContent { + ballot: self.ballot, + received_value, + }; + let msg = content.pack(self.id)?; + self.msg_out_sender - .send(MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&Message2avContent { - ballot: self.ballot, - received_value: bincode::serialize(&self.value_2a.clone()).map_err( - |_| BallotError::ValueParsing("Failed to serialize value".into()), - )?, - }) - .map_err(|_| { - BallotError::MessageParsing("Failed to serialize Msg2av".into()) - })?, - routing: Message2avContent::get_routing(self.id), - }) - .map_err(|_| BallotError::Communication("Failed to send Msg2av".into()))?; + .send(msg) + .map_err(|err| FailedToSendMessage(err.to_string()))?; } PartyEvent::Launch2b => { if self.status != PartyStatus::Passed2av { - return Err(BallotError::InvalidState( - "Cannot launch 2b, incorrect state".into(), - )); + return Err(PartyStatusMismatch { + party_status: self.status, + needed_status: PartyStatus::Passed2av, + } + .into()); } + + let content = &Message2bContent { + ballot: self.ballot, + }; + let msg = content.pack(self.id)?; + self.msg_out_sender - .send(MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&Message2bContent { - ballot: self.ballot, - }) - .map_err(|_| { - BallotError::MessageParsing("Failed to serialize Msg2b".into()) - })?, - routing: Message2bContent::get_routing(self.id), - }) - .map_err(|_| BallotError::Communication("Failed to send Msg2b".into()))?; + .send(msg) + .map_err(|err| FailedToSendMessage(err.to_string()))?; } PartyEvent::Finalize => { if self.status != PartyStatus::Passed2b { - return Err(BallotError::InvalidState( - "Cannot finalize, incorrect state".into(), - )); + return Err(PartyStatusMismatch { + party_status: self.status, + needed_status: PartyStatus::Passed2av, + } + .into()); } + self.status = PartyStatus::Finished; } } @@ -790,22 +741,29 @@ impl> Party { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; - use rand::Rng; - use seeded_random::{Random, Seed}; + use crate::leader::DefaultLeaderElector; + use crate::party::PartyStatus::{Launched, Passed1a, Passed1b, Passed2a}; use std::collections::HashMap; - use std::thread; + use std::fmt::{Display, Formatter}; + use std::time::Duration; + use tokio::time; + + #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Debug)] + pub(crate) struct MockValue(u64); - // Mock implementation of Value - #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] - struct MockValue(u64); // Simple mock type wrapping an integer + impl Display for MockValue { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MockValue: {}", self.0) + } + } impl Value for MockValue {} - // Mock implementation of ValueSelector - struct MockValueSelector; + #[derive(Clone)] + pub(crate) struct MockValueSelector; impl ValueSelector for MockValueSelector { fn verify(&self, _v: &MockValue, _m: &HashMap>) -> bool { @@ -813,331 +771,133 @@ mod tests { } fn select(&self, _m: &HashMap>) -> MockValue { - MockValue(42) // For testing, always return the same value - } - } - - fn default_config() -> BPConConfig { - BPConConfig { - party_weights: vec![1, 2, 3], - threshold: 4, - launch_timeout: Duration::from_secs(0), - launch1a_timeout: Duration::from_secs(0), - launch1b_timeout: Duration::from_secs(10), - launch2a_timeout: Duration::from_secs(20), - launch2av_timeout: Duration::from_secs(30), - launch2b_timeout: Duration::from_secs(40), - finalize_timeout: Duration::from_secs(50), - grace_period: Duration::from_secs(1), + MockValue(1) // For testing, always return the same value } } - #[test] - fn test_compute_leader_determinism() { - let cfg = default_config(); - let party = Party::::new( - 0, - cfg, - MockValueSelector, - Box::new(DefaultLeaderElector {}), - ) - .0; - - // Compute the leader multiple times - let leader1 = party.elector.get_leader(&party).unwrap(); - let leader2 = party.elector.get_leader(&party).unwrap(); - let leader3 = party.elector.get_leader(&party).unwrap(); - - // All leaders should be the same due to deterministic seed - assert_eq!( - leader1, leader2, - "Leaders should be consistent on repeated calls" - ); - assert_eq!( - leader2, leader3, - "Leaders should be consistent on repeated calls" - ); + pub(crate) fn default_config() -> BPConConfig { + BPConConfig::with_default_timeouts(vec![1, 2, 3], 4) } - #[test] - fn test_compute_leader_zero_weights() { - let mut cfg = default_config(); - cfg.party_weights = vec![0, 0, 0]; - - let party = Party::::new( + pub(crate) fn default_party() -> Party { + Party::::new( 0, - cfg, + default_config(), MockValueSelector, - Box::new(DefaultLeaderElector {}), + Box::new(DefaultLeaderElector::new()), ) - .0; - - match party.elector.get_leader(&party) { - Err(BallotError::LeaderElection(_)) => { - // The test passes if the error is of type LeaderElection - } - _ => panic!("Expected BallotError::LeaderElection"), - } + .0 } #[test] fn test_update_state_msg1a() { - let cfg = default_config(); - let mut party = Party::::new( - 0, - cfg, - MockValueSelector, - Box::new(DefaultLeaderElector {}), - ) - .0; - party.status = PartyStatus::Launched; - party.ballot = 1; - - // Must send this message from leader of the ballot. - party.leader = 0; // this party's id - - let msg = Message1aContent { ballot: 1 }; - let routing = MessageRouting { - sender: 0, - receivers: vec![2, 3], - is_broadcast: false, - msg_type: ProtocolMessage::Msg1a, + let mut party = default_party(); + party.status = Launched; + let content = Message1aContent { + ballot: party.ballot, }; + let msg = content.pack(party.leader).unwrap(); - let msg_wire = MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&msg).unwrap(), - routing, - }; + party.update_state(&msg).unwrap(); - party - .update_state(msg_wire.content_bytes, msg_wire.routing) - .unwrap(); - assert_eq!(party.status, PartyStatus::Passed1a); + assert_eq!(party.status, Passed1a); } #[test] fn test_update_state_msg1b() { - let cfg = default_config(); - let mut party = Party::::new( - 0, - cfg, - MockValueSelector, - Box::new(DefaultLeaderElector {}), - ) - .0; - party.status = PartyStatus::Passed1a; - party.ballot = 1; + let mut party = default_party(); + party.status = Passed1a; - // First, send a 1b message from party 1 (weight 2) - let msg1 = Message1bContent { - ballot: 1, - last_ballot_voted: Some(0), + let content = Message1bContent { + ballot: party.ballot, + last_ballot_voted: None, last_value_voted: bincode::serialize(&MockValue(42)).ok(), }; - let routing1 = MessageRouting { - sender: 1, // Party 1 sends the message - receivers: vec![0], - is_broadcast: false, - msg_type: ProtocolMessage::Msg1b, - }; - - let msg_wire1 = MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&msg1).unwrap(), - routing: routing1, - }; - party - .update_state(msg_wire1.content_bytes, msg_wire1.routing) - .unwrap(); - - // Now, send a 1b message from party 2 (weight 3) - let msg2 = Message1bContent { - ballot: 1, - last_ballot_voted: Some(0), - last_value_voted: bincode::serialize(&MockValue(42)).ok(), - }; - let routing2 = MessageRouting { - sender: 2, // Party 2 sends the message - receivers: vec![0], - is_broadcast: false, - msg_type: ProtocolMessage::Msg1b, - }; + // First, send a 1b message from party 1 (weight 2) + let msg = content.pack(1).unwrap(); + party.update_state(&msg).unwrap(); - let msg_wire2 = MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&msg2).unwrap(), - routing: routing2, - }; - - party - .update_state(msg_wire2.content_bytes, msg_wire2.routing) - .unwrap(); + // Then, send a 1b message from party 2 (weight 3) + let msg = content.pack(2).unwrap(); + party.update_state(&msg).unwrap(); // After both messages, the cumulative weight is 2 + 3 = 5, which exceeds the threshold - assert_eq!(party.status, PartyStatus::Passed1b); + assert_eq!(party.status, Passed1b); } #[test] fn test_update_state_msg2a() { - let cfg = default_config(); - let mut party = Party::::new( - 0, - cfg, - MockValueSelector, - Box::new(DefaultLeaderElector {}), - ) - .0; - party.status = PartyStatus::Passed1b; - party.ballot = 1; + let mut party = default_party(); + party.status = Passed1b; + party.leader = 1; - // Must send this message from leader of the ballot. - party.leader = 0; // this party's id - - let msg = Message2aContent { - ballot: 1, + let content = Message2aContent { + ballot: party.ballot, value: bincode::serialize(&MockValue(42)).unwrap(), }; - let routing = MessageRouting { - sender: 0, - receivers: vec![0], - is_broadcast: false, - msg_type: ProtocolMessage::Msg2a, - }; + let msg = content.pack(1).unwrap(); + party.update_state(&msg).unwrap(); - let msg_wire = MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&msg).unwrap(), - routing, - }; - - party - .update_state(msg_wire.content_bytes, msg_wire.routing) - .unwrap(); - - assert_eq!(party.status, PartyStatus::Passed2a); + assert_eq!(party.status, Passed2a); } #[test] fn test_update_state_msg2av() { - let cfg = default_config(); - let mut party = Party::::new( - 0, - cfg, - MockValueSelector, - Box::new(DefaultLeaderElector {}), - ) - .0; - party.status = PartyStatus::Passed2a; - party.ballot = 1; - party.value_2a = Some(MockValue(42)); - - // Send first 2av message from party 2 (weight 3) - let msg1 = Message2avContent { - ballot: 1, - received_value: bincode::serialize(&MockValue(42)).unwrap(), - }; - let routing1 = MessageRouting { - sender: 2, - receivers: vec![0], - is_broadcast: false, - msg_type: ProtocolMessage::Msg2av, - }; - - let msg_wire1 = MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&msg1).unwrap(), - routing: routing1, - }; - - party - .update_state(msg_wire1.content_bytes, msg_wire1.routing) - .unwrap(); + let mut party = default_party(); + party.status = Passed2a; + party.value_2a = Some(MockValue(1)); - // Now send a second 2av message from party 1 (weight 2) - let msg2 = Message2avContent { - ballot: 1, - received_value: bincode::serialize(&MockValue(42)).unwrap(), - }; - let routing2 = MessageRouting { - sender: 1, - receivers: vec![0], - is_broadcast: false, - msg_type: ProtocolMessage::Msg2av, + let content = Message2avContent { + ballot: party.ballot, + received_value: bincode::serialize(&MockValue(1)).unwrap(), }; - let msg_wire2 = MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&msg2).unwrap(), - routing: routing2, - }; + // Send first 2av message from party 1 (weight 2) + let msg = content.pack(1).unwrap(); + party.update_state(&msg).unwrap(); - party - .update_state(msg_wire2.content_bytes, msg_wire2.routing) - .unwrap(); + // Now send a second 2av message from party 2 (weight 3) + let msg = content.pack(2).unwrap(); + party.update_state(&msg).unwrap(); - // The cumulative weight (3 + 2) should exceed the threshold of 4 + // The cumulative weight (2 + 3) should exceed the threshold of 4 assert_eq!(party.status, PartyStatus::Passed2av); } #[test] fn test_update_state_msg2b() { - let cfg = default_config(); - let mut party = Party::::new( - 0, - cfg, - MockValueSelector, - Box::new(DefaultLeaderElector {}), - ) - .0; + let mut party = default_party(); party.status = PartyStatus::Passed2av; - party.ballot = 1; - - // Simulate that both party 2 and party 1 already sent 2av messages - party.messages_2av_state.add_sender(1, 1); - party.messages_2av_state.add_sender(2, 2); - - // Send first 2b message from party 2 (weight 3) - let msg1 = Message2bContent { ballot: 1 }; - let routing1 = MessageRouting { - sender: 2, - receivers: vec![0], - is_broadcast: false, - msg_type: ProtocolMessage::Msg2b, - }; - let msg_wire1 = MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&msg1).unwrap(), - routing: routing1, + // Simulate that both party 1 and party 2 already sent 2av messages + party.messages_2av_state.add_sender(1, 2); + party.messages_2av_state.add_sender(2, 3); + + let content = Message2bContent { + ballot: party.ballot, }; - party - .update_state(msg_wire1.content_bytes, msg_wire1.routing) - .unwrap(); + // Send first 2b message from party 1 (weight 2) + let msg = content.pack(1).unwrap(); + party.update_state(&msg).unwrap(); // Print the current state and weight println!( - "After first Msg2b: Status = {:?}, 2b Weight = {}", - party.status, party.messages_2b_state.weight + "After first Msg2b: Status = {}, 2b Weight = {}", + party.status, + party.messages_2b_state.get_weight() ); - // Now send a second 2b message from party 1 (weight 2) - let msg2 = Message2bContent { ballot: 1 }; - let routing2 = MessageRouting { - sender: 1, - receivers: vec![0], - is_broadcast: false, - msg_type: ProtocolMessage::Msg2b, - }; - - let msg_wire2 = MessagePacket { - content_bytes: rkyv::to_bytes::<_, 256>(&msg2).unwrap(), - routing: routing2, - }; - - party - .update_state(msg_wire2.content_bytes, msg_wire2.routing) - .unwrap(); + // Now send a second 2b message from party 2 (weight 3) + let msg = content.pack(2).unwrap(); + party.update_state(&msg).unwrap(); // Print the current state and weight println!( - "After second Msg2b: Status = {:?}, 2b Weight = {}", - party.status, party.messages_2b_state.weight + "After second Msg2b: Status = {}, 2b Weight = {}", + party.status, + party.messages_2b_state.get_weight() ); // The cumulative weight (3 + 2) should exceed the threshold of 4 @@ -1147,81 +907,47 @@ mod tests { #[test] fn test_follow_event_launch1a() { let cfg = default_config(); - let (mut party, _msg_out_receiver, _msg_in_sender) = - Party::::new( - 0, - cfg, - MockValueSelector, - Box::new(DefaultLeaderElector {}), - ); - - party.status = PartyStatus::Launched; - party.ballot = 1; - - party - .follow_event(PartyEvent::Launch1a) - .expect("Failed to follow Launch1a event"); - - // If the party is the leader and in the Launched state, the event should trigger a message. - assert_eq!(party.status, PartyStatus::Launched); // Status remains Launched, as no state change expected here - } - - #[test] - fn test_ballot_reset_after_failure() { - let cfg = default_config(); - let (mut party, _, _) = Party::::new( + // Need to take ownership of msg_out_receiver, so that sender doesn't close, + // since otherwise msg_out_receiver will be dropped. + let (mut party, _msg_out_receiver, _) = Party::::new( 0, cfg, MockValueSelector, Box::new(DefaultLeaderElector {}), ); - party.status = PartyStatus::Failed; - party.ballot = 1; + party.status = Launched; + party.leader = party.id; - party.prepare_next_ballot().unwrap(); + party + .follow_event(PartyEvent::Launch1a) + .expect("Failed to follow Launch1a event"); - // Check that state has been reset - assert_eq!(party.status, PartyStatus::Launched); - assert_eq!(party.ballot, 2); // Ballot number should have incremented - assert!(party.parties_voted_before.is_empty()); - assert_eq!(party.messages_1b_weight, 0); - assert!(party.messages_2av_state.senders.is_empty()); - assert_eq!(party.messages_2av_state.weight, 0); - assert!(party.messages_2b_state.senders.is_empty()); - assert_eq!(party.messages_2b_state.weight, 0); + // If the party is the leader and in the Launched state, the event should trigger a message. + // And it's status shall update to Passed1a after sending 1a message, + // contrary to other participants, whose `Passed1a` updates only after receiving 1a message. + assert_eq!(party.status, Passed1a); } #[test] fn test_follow_event_communication_failure() { - let cfg = default_config(); - - // This party id is precomputed for this specific party_weights, threshold and ballot. - // Because we need leader to send 1a. - let party_id = 0; - - let (mut party, msg_out_receiver, _) = Party::::new( - party_id, - cfg, - MockValueSelector, - Box::new(DefaultLeaderElector {}), - ); - - party.status = PartyStatus::Launched; - party.ballot = 1; - - drop(msg_out_receiver); // Drop the receiver to simulate a communication failure + // msg_out_receiver channel, bound to corresponding sender, which will try to use + // follow event, is getting dropped since we don't take ownership of it + // upon creation of the party + let mut party = default_party(); + party.status = Launched; + party.leader = party.id; let result = party.follow_event(PartyEvent::Launch1a); match result { - Err(BallotError::Communication(err_msg)) => { - assert_eq!( - err_msg, "Failed to send Msg1a", - "Expected specific communication error message" - ); + Err(FailedToSendMessage(_)) => { + // this is expected outcome } - _ => panic!("Expected BallotError::Communication, got {:?}", result), + _ => panic!( + "Expected FollowEventError::FailedToSendMessage, got {:?}", + result + ), } } @@ -1232,6 +958,7 @@ mod tests { // Set up the Party with necessary configuration let cfg = default_config(); + let (event_sender, mut event_receiver) = unbounded_channel(); // Need to return all 3 values, so that they don't get dropped @@ -1241,7 +968,7 @@ mod tests { 0, cfg.clone(), MockValueSelector, - Box::new(DefaultLeaderElector {}), + Box::new(DefaultLeaderElector::new()), ); // Same here, we would like to not lose party's event_receiver, so that test doesn't fail. @@ -1253,7 +980,10 @@ mod tests { party.launch_ballot().await.unwrap(); }); + time::advance(cfg.launch_timeout).await; + // Sequential time advance and event check + time::advance(cfg.launch1a_timeout).await; assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Launch1a); @@ -1273,74 +1003,155 @@ mod tests { assert_eq!(event_receiver.recv().await.unwrap(), PartyEvent::Finalize); } - fn debug_hash_to_range_new(seed: u64, range: u64) -> u64 { - assert!(range > 1); - - let mut k = 64; - while 1u64 << (k - 1) >= range { - k -= 1; - } + #[tokio::test] + async fn test_end_to_end_ballot() { + // Configuration for the parties + let cfg = BPConConfig::with_default_timeouts(vec![1, 1, 1, 1], 2); - let rng = Random::from_seed(Seed::unsafe_new(seed)); + // ValueSelector and LeaderElector instances + let value_selector = MockValueSelector; + let leader_elector = Box::new(DefaultLeaderElector::new()); - let mut iteration = 1u64; - loop { - let mut raw_res: u64 = rng.gen(); - raw_res >>= 64 - k; + // Create 4 parties + let (mut party0, msg_out_receiver0, msg_in_sender0) = + Party::::new( + 0, + cfg.clone(), + value_selector.clone(), + leader_elector.clone(), + ); + let (mut party1, msg_out_receiver1, msg_in_sender1) = + Party::::new( + 1, + cfg.clone(), + value_selector.clone(), + leader_elector.clone(), + ); + let (mut party2, msg_out_receiver2, msg_in_sender2) = + Party::::new( + 2, + cfg.clone(), + value_selector.clone(), + leader_elector.clone(), + ); + let (mut party3, msg_out_receiver3, msg_in_sender3) = + Party::::new( + 3, + cfg.clone(), + value_selector.clone(), + leader_elector.clone(), + ); - if raw_res < range { - return raw_res; + // Channels for receiving the selected values + let (value_sender0, value_receiver0) = tokio::sync::oneshot::channel(); + let (value_sender1, value_receiver1) = tokio::sync::oneshot::channel(); + let (value_sender2, value_receiver2) = tokio::sync::oneshot::channel(); + let (value_sender3, value_receiver3) = tokio::sync::oneshot::channel(); + + // Launch ballot tasks for each party + let ballot_task0 = tokio::spawn(async move { + match party0.launch_ballot().await { + Ok(Some(value)) => { + value_sender0.send(value).unwrap(); + } + Ok(None) => { + eprintln!("Party 0: No value was selected"); + } + Err(err) => { + eprintln!("Party 0 encountered an error: {:?}", err); + } } + }); - iteration += 1; - assert!(iteration <= 50) - } - } - - #[test] - #[ignore] // Ignoring since it takes a while to run - fn test_hash_range_random() { - // test the uniform distribution - - const N: usize = 37; - const M: i64 = 10000000; - - let mut cnt1: [i64; N] = [0; N]; - - for _ in 0..M { - let mut rng = rand::thread_rng(); - let seed: u64 = rng.random(); - - let res1 = debug_hash_to_range_new(seed, N as u64); - assert!(res1 < N as u64); - - cnt1[res1 as usize] += 1; - } - - println!("1: {:?}", cnt1); - - let mut avg1: i64 = 0; - - for item in cnt1.iter().take(N) { - avg1 += (M / (N as i64) - item).abs(); - } - - avg1 /= N as i64; + let ballot_task1 = tokio::spawn(async move { + match party1.launch_ballot().await { + Ok(Some(value)) => { + value_sender1.send(value).unwrap(); + } + Ok(None) => { + eprintln!("Party 1: No value was selected"); + } + Err(err) => { + eprintln!("Party 1 encountered an error: {:?}", err); + } + } + }); - println!("Avg 1: {}", avg1); - } + let ballot_task2 = tokio::spawn(async move { + match party2.launch_ballot().await { + Ok(Some(value)) => { + value_sender2.send(value).unwrap(); + } + Ok(None) => { + eprintln!("Party 2: No value was selected"); + } + Err(err) => { + eprintln!("Party 2 encountered an error: {:?}", err); + } + } + }); - #[test] - fn test_rng() { - let rng1 = Random::from_seed(Seed::unsafe_new(123456)); - let rng2 = Random::from_seed(Seed::unsafe_new(123456)); + let ballot_task3 = tokio::spawn(async move { + match party3.launch_ballot().await { + Ok(Some(value)) => { + value_sender3.send(value).unwrap(); + } + Ok(None) => { + eprintln!("Party 3: No value was selected"); + } + Err(err) => { + eprintln!("Party 3 encountered an error: {:?}", err); + } + } + }); - println!("{}", rng1.gen::()); - println!("{}", rng2.gen::()); + // Simulate message passing between the parties + tokio::spawn(async move { + let mut receivers = [ + msg_out_receiver0, + msg_out_receiver1, + msg_out_receiver2, + msg_out_receiver3, + ]; + let senders = [ + msg_in_sender0, + msg_in_sender1, + msg_in_sender2, + msg_in_sender3, + ]; + + loop { + for (i, receiver) in receivers.iter_mut().enumerate() { + if let Ok(msg) = receiver.try_recv() { + // Broadcast the message to all other parties + for (j, sender) in senders.iter().enumerate() { + if i != j { + sender.send(msg.clone()).unwrap(); + } + } + } + } - thread::sleep(Duration::from_secs(2)); + // Delay to simulate network latency + sleep(Duration::from_millis(100)).await; + } + }); - println!("{}", rng1.gen::()); - println!("{}", rng2.gen::()); + // Await the completion of ballot tasks + ballot_task0.await.unwrap(); + ballot_task1.await.unwrap(); + ballot_task2.await.unwrap(); + ballot_task3.await.unwrap(); + + // Await results from each party + let value0 = value_receiver0.await.unwrap(); + let value1 = value_receiver1.await.unwrap(); + let value2 = value_receiver2.await.unwrap(); + let value3 = value_receiver3.await.unwrap(); + + // Check that all parties reached the same consensus value + assert_eq!(value0, value1, "Party 0 and 1 agreed on the same value"); + assert_eq!(value1, value2, "Party 1 and 2 agreed on the same value"); + assert_eq!(value2, value3, "Party 2 and 3 agreed on the same value"); } } diff --git a/src/value.rs b/src/value.rs new file mode 100644 index 0000000..95056c7 --- /dev/null +++ b/src/value.rs @@ -0,0 +1,60 @@ +//! Definitions central to BPCon values. +//! +//! This module defines traits and structures related to the values used within the BPCon consensus protocol. +//! It includes a general `Value` trait for handling values in the protocol, as well as a `ValueSelector` trait +//! for implementing value selection and verification logic. + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fmt::{Debug, Display}; + +/// A general trait representing a value in the BPCon consensus protocol. +/// +/// Implementing this trait ensures that values can be safely transmitted and logged during the +/// consensus process. +pub trait Value: Eq + Serialize + for<'a> Deserialize<'a> + Clone + Debug + Display {} + +/// A trait for selecting and verifying values in the BPCon consensus protocol. +/// +/// The `ValueSelector` trait provides the functionality for verifying that a value has been +/// selected according to the rules of the protocol and for selecting a value based on +/// specific conditions. +/// +/// ## Important Rules: +/// - **Safety**: Value selection must adhere to the BPCon protocol rules, meaning only "safe" values +/// can be selected. This implies that a party should not vote for different values, even across +/// different ballots. +/// - **Consensus Compliance**: The selection process should consider the state of messages (typically +/// from 1b messages) sent by other parties, ensuring the selected value is compliant with the +/// collective state of the consensus. +/// +/// # Type Parameters +/// - `V`: The type of value being selected and verified. This must implement the `Value` trait. +pub trait ValueSelector: Clone { + /// Verifies if a value has been correctly selected. + /// + /// This method checks if the provided value `v` is selected according to the protocol's rules. + /// The verification process typically involves examining 1b messages from other parties, contained + /// within the `HashMap`. + /// + /// # Parameters + /// - `v`: The value to verify. + /// - `m`: A `HashMap` mapping party IDs (`u64`) to their corresponding optional values (`Option`). + /// + /// # Returns + /// `true` if the value is correctly selected; otherwise, `false`. + fn verify(&self, v: &V, m: &HashMap>) -> bool; + + /// Selects a value based on internal conditions and messages from other parties. + /// + /// This method determines the value to be selected for the consensus process, based on the state + /// of 1b messages received from other parties. The selection must comply with the BPCon protocol's + /// safety and consistency requirements. + /// + /// # Parameters + /// - `m`: A `HashMap` mapping party IDs (`u64`) to their corresponding optional values (`Option`). + /// + /// # Returns + /// The selected value of type `V`. + fn select(&self, m: &HashMap>) -> V; +}