diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml new file mode 100644 index 0000000..b7b02e4 --- /dev/null +++ b/.github/workflows/docs.yml @@ -0,0 +1,52 @@ +name: Deploy Rust Docs to GitHub Pages + +on: + push: + branches: + - main + +concurrency: + group: "pages" + cancel-in-progress: false + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Install stable toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + + - name: Build Rust Documentation + run: | + cargo doc --no-deps --document-private-items + echo "" > target/doc/index.html + + - name: Upload artifact + uses: actions/upload-pages-artifact@v3 + with: + path: ./target/doc + + deploy: + runs-on: ubuntu-latest + needs: build + + permissions: + pages: write + id-token: write + + environment: + name: docs + url: ${{ steps.deployment.outputs.page_url }} + + steps: + - name: Deploy to GitHub Pages + id: deployment + uses: actions/deploy-pages@v4 diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..f8d9382 --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,135 @@ +name: Rust CI + +on: + pull_request: + branches: + - main + push: + branches: + - main + workflow_dispatch: + +permissions: + contents: read + +jobs: + check: + name: Check + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Cache Cargo registry + uses: actions/cache@v4 + with: + path: ~/.cargo/registry + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-registry- + + - name: Cache Cargo build + uses: actions/cache@v4 + with: + path: target + key: ${{ runner.os }}-cargo-build-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-build- + + - name: Install stable toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + + - name: Run cargo check + uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: check + + test: + name: Test Suite + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + rust: [stable, beta, nightly] + runs-on: ${{ matrix.os }} + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Cache Cargo registry + uses: actions/cache@v4 + with: + path: ~/.cargo/registry + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-registry- + + - name: Cache Cargo build + uses: actions/cache@v4 + with: + path: target + key: ${{ runner.os }}-cargo-build-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-build- + + - name: Install ${{ matrix.rust }} toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: ${{ matrix.rust }} + + - name: Run cargo test + uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: test + args: --all-features --verbose + + lints: + name: Lints + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Cache Cargo registry + uses: actions/cache@v4 + with: + path: ~/.cargo/registry + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-registry- + + - name: Cache Cargo build + uses: actions/cache@v4 + with: + path: target + key: ${{ runner.os }}-cargo-build-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-build- + + - name: Install stable toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + components: rustfmt, clippy + + - name: Run cargo fmt + uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: fmt + args: --all -- --check + + - name: Run cargo clippy + uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: clippy + args: --all-targets -- -D warnings diff --git a/Cargo.toml b/Cargo.toml index 287eb66..4383852 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,4 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = { version = "1.0.204", features = ["derive"] } +rand = "0.9.0-alpha.2" +log = "0.4.22" +rkyv = { version = "0.7.44", features = ["validation"]} +serde = { version = "1.0.207", features = ["derive"] } +bincode = "1.3.3" diff --git a/README.md b/README.md new file mode 100644 index 0000000..6d8f44a --- /dev/null +++ b/README.md @@ -0,0 +1,27 @@ +# BPCon Rust Library + +[![License: GPL v3](https://img.shields.io/badge/License-GPLv3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0) + +This is a generic rust implementation of the `BPCon` consensus mechanism. + +## Library Structure + +### src/party.rs + +Main entity in this implementation is `Party` - it represents member of the consensus. + +External system shall create desired amount of parties. + +We have 2 communication channels - one for sending `MessageWire` - encoded in bytes message and routing information, +and the other for pitching consensus events - this allows for external system to impose custom limitations and rules +regarding runway. + +### src/message.rs + +Definitions of the general message struct, routing information and type-specific contents. + +### src/lib.rs + +Here we present a trait for the value on which consensus is being conducted. Additionally, there is a trait for +defining custom value selection rules, called `ValueSelector`. + diff --git a/src/error.rs b/src/error.rs index 032fdc9..b9d84b8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,56 @@ //! Definition of the BPCon errors. + +use std::fmt; + +#[derive(Debug)] pub enum BallotError { - // TODO: define errors. -} \ No newline at end of file + MessageParsing(String), + ValueParsing(String), + InvalidState(String), + Communication(String), + LeaderElection(String), +} + +impl fmt::Display for BallotError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + BallotError::MessageParsing(ref err) => write!(f, "Message parsing error: {}", err), + BallotError::ValueParsing(ref err) => write!(f, "Value parsing error: {}", err), + BallotError::InvalidState(ref err) => write!(f, "Invalid state error: {}", err), + BallotError::Communication(ref err) => write!(f, "Communication error: {}", err), + BallotError::LeaderElection(ref err) => write!(f, "Leader election error: {}", err), + } + } +} + +impl std::error::Error for BallotError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + // Since these are all simple String errors, there is no underlying source error. + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ballot_error_message_parsing() { + let error = BallotError::MessageParsing("Parsing failed".into()); + if let BallotError::MessageParsing(msg) = error { + assert_eq!(msg, "Parsing failed"); + } else { + panic!("Expected MessageParsing error"); + } + } + + #[test] + fn test_ballot_error_invalid_state() { + let error = BallotError::InvalidState("Invalid state transition".into()); + if let BallotError::InvalidState(msg) = error { + assert_eq!(msg, "Invalid state transition"); + } else { + panic!("Expected InvalidState error"); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index b8fa6a5..afa03b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,18 +1,23 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +mod error; pub mod message; pub mod party; -mod error; /// General trait for value itself. -pub trait Value: Eq {} +pub trait Value: Eq + Serialize + for<'a> Deserialize<'a> + Clone {} /// Trait for value selector and verificator. /// Value selection and verification may depend on different conditions for different values. +/// Note that value selection should follow the rules of BPCon: only safe values can be selected. +/// Party can not vote for different values, even in different ballots. pub trait ValueSelector { - /// Verifies if a value is selected correctly. - fn verify(v: V) -> bool; + /// Verifies if a value is selected correctly. Accepts 2b messages from parties. + fn verify(&self, v: &V, m: &HashMap>) -> bool; - /// Select value depending on inner conditions. - fn select() -> V; + /// Select value depending on inner conditions. Accepts 2b messages from parties. + fn select(&self, m: &HashMap>) -> V; // TODO: add other fields to update selector state. } diff --git a/src/message.rs b/src/message.rs index eb7c68e..daa86ab 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,32 +1,11 @@ -//! Definition of the BPCon message trait and enum. +//! Definition of the BPCon messages. -pub mod msg1a; -pub mod msg1b; -pub mod msg1c; -pub mod msg2a; -pub mod msg2av; -pub mod msg2b; - -use serde::{Deserialize, Serialize}; - -/// Generic communicative unit in ballot. -pub trait Message: Serialize + for<'a> Deserialize<'a> { - /// Which participant this message came from. - fn get_sender_id(&self) -> u64; - /// Where this message should be delivered. - fn get_receivers_id(&self) -> Vec; - /// Indicates whether this message shall be broadcast to other participants. Can be empty if `is_broadcast` is `true` - fn is_broadcast(&self) -> bool; - /// Encode inner message to bytes and receive routing information. - fn msg_routing(&self) -> MessageRouting; - /// Returns the BPCon message type. - fn msg_type(&self) -> ProtocolMessage; -} +use rkyv::{AlignedVec, Archive, Deserialize, Serialize}; /// Message ready for transfer. -pub struct MessageWire{ +pub struct MessagePacket { /// Serialized message contents. - pub content_bytes: Vec, + pub content_bytes: AlignedVec, /// Routing information. pub routing: MessageRouting, } @@ -47,8 +26,104 @@ pub struct MessageRouting { pub enum ProtocolMessage { Msg1a, Msg1b, - Msg1c, Msg2a, Msg2av, Msg2b, -} \ No newline at end of file +} + +// Value in messages is stored in serialized format, i.e bytes in order to omit +// strict restriction for `Value` trait to be [de]serializable only with `rkyv`. + +#[derive(Archive, Deserialize, Serialize, Debug, Clone)] +#[archive(compare(PartialEq), check_bytes)] +#[archive_attr(derive(Debug))] +pub struct Message1aContent { + pub ballot: u64, +} + +#[derive(Archive, Deserialize, Serialize, Debug, Clone)] +#[archive(compare(PartialEq), check_bytes)] +#[archive_attr(derive(Debug))] +pub struct Message1bContent { + pub ballot: u64, + pub last_ballot_voted: Option, + pub last_value_voted: Option>, +} + +#[derive(Archive, Deserialize, Serialize, Debug, Clone)] +#[archive(compare(PartialEq), check_bytes)] +#[archive_attr(derive(Debug))] +pub struct Message2aContent { + pub ballot: u64, + pub value: Vec, +} + +#[derive(Archive, Deserialize, Serialize, Debug, Clone)] +#[archive(compare(PartialEq), check_bytes)] +#[archive_attr(derive(Debug))] +pub struct Message2avContent { + pub ballot: u64, + pub received_value: Vec, +} + +#[derive(Archive, Deserialize, Serialize, Debug, Clone)] +#[archive(compare(PartialEq), check_bytes)] +#[archive_attr(derive(Debug))] +pub struct Message2bContent { + pub ballot: u64, +} + +impl Message1aContent { + pub fn get_routing(id: u64) -> MessageRouting { + MessageRouting { + sender: id, + receivers: vec![], + is_broadcast: true, + msg_type: ProtocolMessage::Msg1a, + } + } +} + +impl Message1bContent { + pub fn get_routing(id: u64) -> MessageRouting { + MessageRouting { + sender: id, + receivers: vec![], + is_broadcast: true, + msg_type: ProtocolMessage::Msg1b, + } + } +} + +impl Message2aContent { + pub fn get_routing(id: u64) -> MessageRouting { + MessageRouting { + sender: id, + receivers: vec![], + is_broadcast: true, + msg_type: ProtocolMessage::Msg2a, + } + } +} + +impl Message2avContent { + pub fn get_routing(id: u64) -> MessageRouting { + MessageRouting { + sender: id, + receivers: vec![], + is_broadcast: true, + msg_type: ProtocolMessage::Msg2av, + } + } +} + +impl Message2bContent { + pub fn get_routing(id: u64) -> MessageRouting { + MessageRouting { + sender: id, + receivers: vec![], + is_broadcast: true, + msg_type: ProtocolMessage::Msg2b, + } + } +} diff --git a/src/message/msg1a.rs b/src/message/msg1a.rs deleted file mode 100644 index cb9ad9f..0000000 --- a/src/message/msg1a.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Definition of the BPCon messages implementation. - -use serde::{Deserialize, Serialize}; -use crate::message::{Message, MessageRouting, ProtocolMessage}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message1a {} - -impl Message for Message1a { - fn get_sender_id(&self) -> u64 { - todo!() - } - - fn get_receivers_id(&self) -> Vec { - todo!() - } - - fn is_broadcast(&self) -> bool { - todo!() - } - - fn msg_routing(&self) -> MessageRouting { - todo!() - } - - fn msg_type(&self) -> ProtocolMessage { - todo!() - } -} diff --git a/src/message/msg1b.rs b/src/message/msg1b.rs deleted file mode 100644 index 8e9ab23..0000000 --- a/src/message/msg1b.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Definition of the BPCon messages implementation. - -use serde::{Deserialize, Serialize}; -use crate::message::{Message, MessageRouting, ProtocolMessage}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message1b {} - -impl Message for Message1b { - fn get_sender_id(&self) -> u64 { - todo!() - } - - fn get_receivers_id(&self) -> Vec { - todo!() - } - - fn is_broadcast(&self) -> bool { - todo!() - } - - fn msg_routing(&self) -> MessageRouting { - todo!() - } - - fn msg_type(&self) -> ProtocolMessage { - todo!() - } -} diff --git a/src/message/msg1c.rs b/src/message/msg1c.rs deleted file mode 100644 index bb91682..0000000 --- a/src/message/msg1c.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Definition of the BPCon messages implementation. - -use serde::{Deserialize, Serialize}; -use crate::message::{Message, MessageRouting, ProtocolMessage}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message1c {} - -impl Message for Message1c { - fn get_sender_id(&self) -> u64 { - todo!() - } - - fn get_receivers_id(&self) -> Vec { - todo!() - } - - fn is_broadcast(&self) -> bool { - todo!() - } - - fn msg_routing(&self) -> MessageRouting { - todo!() - } - - fn msg_type(&self) -> ProtocolMessage { - todo!() - } -} diff --git a/src/message/msg2a.rs b/src/message/msg2a.rs deleted file mode 100644 index 9a97d74..0000000 --- a/src/message/msg2a.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Definition of the BPCon messages implementation. - -use serde::{Deserialize, Serialize}; -use crate::message::{Message, MessageRouting, ProtocolMessage}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message2a {} - -impl Message for Message2a { - fn get_sender_id(&self) -> u64 { - todo!() - } - - fn get_receivers_id(&self) -> Vec { - todo!() - } - - fn is_broadcast(&self) -> bool { - todo!() - } - - fn msg_routing(&self) -> MessageRouting { - todo!() - } - - fn msg_type(&self) -> ProtocolMessage { - todo!() - } -} diff --git a/src/message/msg2av.rs b/src/message/msg2av.rs deleted file mode 100644 index bd0c5a8..0000000 --- a/src/message/msg2av.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Definition of the BPCon messages implementation. - -use serde::{Deserialize, Serialize}; -use crate::message::{Message, MessageRouting, ProtocolMessage}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message2av {} - -impl Message for Message2av { - fn get_sender_id(&self) -> u64 { - todo!() - } - - fn get_receivers_id(&self) -> Vec { - todo!() - } - - fn is_broadcast(&self) -> bool { - todo!() - } - - fn msg_routing(&self) -> MessageRouting { - todo!() - } - - fn msg_type(&self) -> ProtocolMessage { - todo!() - } -} diff --git a/src/message/msg2b.rs b/src/message/msg2b.rs deleted file mode 100644 index 0c3f9b5..0000000 --- a/src/message/msg2b.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Definition of the BPCon messages implementation. - -use serde::{Deserialize, Serialize}; -use crate::message::{Message, MessageRouting, ProtocolMessage}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message2b {} - -impl Message for Message2b { - fn get_sender_id(&self) -> u64 { - todo!() - } - - fn get_receivers_id(&self) -> Vec { - todo!() - } - - fn is_broadcast(&self) -> bool { - todo!() - } - - fn msg_routing(&self) -> MessageRouting { - todo!() - } - - fn msg_type(&self) -> ProtocolMessage { - todo!() - } -} diff --git a/src/party.rs b/src/party.rs index 4be1131..85f6b6f 100644 --- a/src/party.rs +++ b/src/party.rs @@ -1,13 +1,145 @@ //! Definition of the BPCon participant structure. -use std::sync::mpsc::{Receiver, Sender}; -use crate::{Value, ValueSelector}; use crate::error::BallotError; -use crate::message::{MessageRouting, MessageWire, ProtocolMessage}; +use crate::message::{ + Message1aContent, Message1bContent, Message2aContent, Message2avContent, Message2bContent, + MessagePacket, MessageRouting, ProtocolMessage, +}; +use crate::{Value, ValueSelector}; +use rand::prelude::StdRng; +use rand::prelude::*; +use rand::Rng; +use rkyv::{AlignedVec, Deserialize, Infallible}; +use std::cmp::PartialEq; +use std::collections::hash_map::DefaultHasher; +use std::collections::hash_map::Entry::Vacant; +use std::collections::{HashMap, HashSet}; +use std::hash::{Hash, Hasher}; +use std::sync::mpsc::{channel, Receiver, Sender}; + +/// BPCon configuration. Includes ballot time bounds and other stuff. +pub struct BPConConfig { + /// Parties weights: `party_weights[i]` corresponds to the i-th party weight + pub party_weights: Vec, + + /// Threshold weight to define BFT quorum: should be > 2/3 of total weight + pub threshold: u128, + + /// Leader of the ballot, computed using seed obtained from config. + leader: u64, + // TODO: define other config fields. +} + +impl BPConConfig { + /// Create new config instance. + pub fn new(party_weights: Vec, threshold: u128) -> Self { + let mut cfg = Self { + party_weights, + threshold, + leader: 0, + }; + cfg.leader = cfg.compute_leader().unwrap(); + + cfg + } + + /// Compute leader in a weighed randomized manner. + /// Uses seed from the config, making it deterministic. + fn compute_leader(&self) -> Result { + let seed = self.compute_seed(); + + let total_weight: u64 = self.party_weights.iter().sum(); + if total_weight == 0 { + return Err(BallotError::LeaderElection("Zero weight sum".into())); + } + + // Use the seed from the config to create a deterministic random number generator. + let mut rng = StdRng::seed_from_u64(seed); + + let random_value: u64 = rng.gen_range(0..total_weight); + + let mut cumulative_weight = 0; + for (i, &weight) in self.party_weights.iter().enumerate() { + cumulative_weight += weight; + if random_value < cumulative_weight { + return Ok(i as u64); + } + } + Err(BallotError::LeaderElection("Election failed".into())) + } + + /// Compute seed for randomized leader election. + fn compute_seed(&self) -> u64 { + let mut hasher = DefaultHasher::new(); + + // Hash each field that should contribute to the seed + self.party_weights.hash(&mut hasher); + self.threshold.hash(&mut hasher); + + // You can add more fields as needed + + // Generate the seed from the hash + hasher.finish() + } +} -/// BPCon configuration. Includes ballot time bounds, and other stuff. -pub struct BallotConfig { - // TODO: define config fields. +/// Party status defines the statuses of the ballot for the particular participant +/// depending on local calculations. +#[derive(PartialEq, Debug)] +pub(crate) enum PartyStatus { + None, + Launched, + Passed1a, + Passed1b, + Passed2a, + Passed2av, + Passed2b, + Finished, + Failed, +} + +/// Party events is used for the ballot flow control. +#[derive(PartialEq)] +pub(crate) enum PartyEvent { + Launch1a, + Launch1b, + Launch2a, + Launch2av, + Launch2b, + Finalize, +} + +/// A struct to keep track of senders and the cumulative weight of their messages. +struct MessageRoundState { + senders: HashSet, + weight: u128, +} + +impl MessageRoundState { + /// Creates a new instance of `MessageRoundState`. + fn new() -> Self { + Self { + senders: HashSet::new(), + weight: 0, + } + } + + /// Adds a sender and their corresponding weight. + fn add_sender(&mut self, sender: u64, weight: u128) { + self.senders.insert(sender); + self.weight += weight; + } + + /// Checks if the sender has already sent a message. + fn contains_sender(&self, sender: &u64) -> bool { + self.senders.contains(sender) + } + + /// Resets the state. + fn reset(&mut self) { + self.senders.clear(); + self.weight = 0; + } } /// Party of the BPCon protocol that executes ballot. @@ -24,67 +156,874 @@ pub struct BallotConfig { pub struct Party> { /// This party's identifier. pub id: u64, - /// Other ballot parties' ids. - pub party_ids: Vec, /// Communication queues. - in_receiver: Receiver, - out_sender: Sender, + msg_in_receiver: Receiver, + msg_out_sender: Sender, - /// Query to submit result. - value_sender: Sender>, + /// Query to receive and send events that run ballot protocol + event_receiver: Receiver, + event_sender: Sender, - /// Ballot config (e.g. ballot time bounds). - cfg: BallotConfig, + /// BPCon config (e.g. ballot time bounds, parties weights, etc.). + cfg: BPConConfig, /// Main functional for value selection. value_selector: VS, - // TODO: define other state fields if needed. + /// Status of the ballot execution + status: PartyStatus, + + /// Current ballot number + ballot: u64, + + /// Last ballot where party submitted 2b message + last_ballot_voted: Option, + + /// Last value for which party submitted 2b message + last_value_voted: Option, + + /// Local round fields + + /// 1b round state + /// + parties_voted_before: HashMap>, // id <-> value + messages_1b_weight: u128, + + /// 2a round state + /// + value_2a: Option, + + /// 2av round state + /// + messages_2av_state: MessageRoundState, + + /// 2b round state + /// + messages_2b_state: MessageRoundState, } impl> Party { pub fn new( id: u64, - party_ids: Vec, - in_receiver: Receiver, - out_sender: Sender, - value_sender: Sender>, - cfg: BallotConfig, + cfg: BPConConfig, value_selector: VS, - ) -> Self { - Self { - id, - party_ids, - in_receiver, - out_sender, - value_sender, - cfg, - value_selector, + ) -> (Self, Receiver, Sender) { + let (event_sender, event_receiver) = channel(); + let (msg_in_sender, msg_in_receiver) = channel(); + let (msg_out_sender, msg_out_receiver) = channel(); + + ( + Self { + id, + msg_in_receiver, + msg_out_sender, + event_receiver, + event_sender, + cfg, + value_selector, + status: PartyStatus::None, + ballot: 0, + last_ballot_voted: None, + last_value_voted: None, + parties_voted_before: HashMap::new(), + messages_1b_weight: 0, + value_2a: None, + messages_2av_state: MessageRoundState::new(), + messages_2b_state: MessageRoundState::new(), + }, + msg_out_receiver, + msg_in_sender, + ) + } + + pub fn ballot(&self) -> u64 { + self.ballot + } + + pub fn is_launched(&self) -> bool { + !self.is_stopped() + } + + pub fn is_stopped(&self) -> bool { + self.status == PartyStatus::Finished || self.status == PartyStatus::Failed + } + + pub fn get_value_selected(&self) -> Option { + // Only `Finished` status means reached BFT agreement + if self.status == PartyStatus::Finished { + return self.value_2a.clone(); + } + + None + } + + fn get_value(&self) -> V { + self.value_selector.select(&self.parties_voted_before) + } + + /// Start the next ballot. It's expected from the external system to re-run ballot protocol in + /// case of failed ballot. + pub async fn launch_ballot(&mut self) -> Result, BallotError> { + self.prepare_next_ballot(); + + while self.is_launched() { + if let Ok(msg_wire) = self.msg_in_receiver.try_recv() { + if let Err(err) = self.update_state(msg_wire.content_bytes, msg_wire.routing) { + self.status = PartyStatus::Failed; + return Err(err); + } + } + + if let Ok(event) = self.event_receiver.try_recv() { + if let Err(err) = self.follow_event(event) { + self.status = PartyStatus::Failed; + return Err(err); + } + } + + // TODO: Emit events to run ballot protocol according to the ballot configuration + self.event_sender.send(PartyEvent::Launch1a).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch1a event".into()) + })?; + + self.event_sender.send(PartyEvent::Launch1b).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch1b event".into()) + })?; + + self.event_sender.send(PartyEvent::Launch2a).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch2a event".into()) + })?; + + self.event_sender.send(PartyEvent::Launch2av).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch2av event".into()) + })?; + + self.event_sender.send(PartyEvent::Launch2b).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Launch2b event".into()) + })?; + + self.event_sender.send(PartyEvent::Finalize).map_err(|_| { + self.status = PartyStatus::Failed; + BallotError::Communication("Failed to send Finalize event".into()) + })?; } + + Ok(self.get_value_selected()) } - /// Start the party. - pub fn start(&mut self) { - // TODO: launch party + /// Prepare state before running a ballot. + fn prepare_next_ballot(&mut self) { + self.status = PartyStatus::None; + self.ballot += 1; + + // Clean state + self.parties_voted_before = HashMap::new(); + self.messages_1b_weight = 0; + self.value_2a = None; + self.messages_2av_state.reset(); + self.messages_2b_state.reset(); + + // Cleaning channels + while self.event_receiver.try_recv().is_ok() {} + while self.msg_in_receiver.try_recv().is_ok() {} + + self.status = PartyStatus::Launched; } /// Update party's state based on message type. - fn update_state(&mut self, msg: Vec, routing: MessageRouting) { - // TODO: Implement according to protocol rules. + fn update_state(&mut self, m: AlignedVec, routing: MessageRouting) -> Result<(), BallotError> { match routing.msg_type { - ProtocolMessage::Msg1a => {} - ProtocolMessage::Msg1b => {} - ProtocolMessage::Msg1c => {} - ProtocolMessage::Msg2a => {} - ProtocolMessage::Msg2av => {} - ProtocolMessage::Msg2b => {} + ProtocolMessage::Msg1a => { + let archived = + rkyv::check_archived_root::(&m[..]).map_err(|err| { + BallotError::MessageParsing(format!("Validation error: {:?}", err)) + })?; + + let msg: Message1aContent = + archived.deserialize(&mut Infallible).map_err(|err| { + BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) + })?; + + if msg.ballot != self.ballot { + return Err(BallotError::InvalidState( + "Ballot number mismatch in Msg1a".into(), + )); + } + + if routing.sender != self.cfg.leader { + return Err(BallotError::InvalidState("Invalid leader in Msg1a".into())); + } + + self.status = PartyStatus::Passed1a; + } + ProtocolMessage::Msg1b => { + let archived = + rkyv::check_archived_root::(&m[..]).map_err(|err| { + BallotError::MessageParsing(format!("Validation error: {:?}", err)) + })?; + + let msg: Message1bContent = + archived.deserialize(&mut Infallible).map_err(|err| { + BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) + })?; + + if msg.ballot != self.ballot { + return Err(BallotError::InvalidState( + "Ballot number mismatch in Msg1b".into(), + )); + } + + if let Some(last_ballot_voted) = msg.last_ballot_voted { + if last_ballot_voted >= self.ballot { + return Err(BallotError::InvalidState( + "Received outdated 1b message".into(), + )); + } + } + + if let Vacant(e) = self.parties_voted_before.entry(routing.sender) { + let value: Option = match msg.last_value_voted { + Some(ref data) => Some(bincode::deserialize(data).map_err(|err| { + BallotError::ValueParsing(format!("Deserialization error: {:?}", err)) + })?), + None => None, + }; + + e.insert(value); + + self.messages_1b_weight += + self.cfg.party_weights[routing.sender as usize] as u128; + + if self.messages_1b_weight > self.cfg.threshold { + self.status = PartyStatus::Passed1b; + } + } + } + ProtocolMessage::Msg2a => { + let archived = + rkyv::check_archived_root::(&m[..]).map_err(|err| { + BallotError::MessageParsing(format!("Validation error: {:?}", err)) + })?; + + let msg: Message2aContent = + archived.deserialize(&mut Infallible).map_err(|err| { + BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) + })?; + + if msg.ballot != self.ballot { + return Err(BallotError::InvalidState( + "Ballot number mismatch in Msg2a".into(), + )); + } + + if routing.sender != self.cfg.leader { + return Err(BallotError::InvalidState("Invalid leader in Msg2a".into())); + } + + let value_received = bincode::deserialize(&msg.value[..]).map_err(|err| { + BallotError::ValueParsing(format!("Failed to parse value in Msg2a: {:?}", err)) + })?; + + if self + .value_selector + .verify(&value_received, &self.parties_voted_before) + { + self.status = PartyStatus::Passed2a; + self.value_2a = Some(value_received); + } else { + return Err(BallotError::InvalidState( + "Failed to verify value in Msg2a".into(), + )); + } + } + ProtocolMessage::Msg2av => { + let archived = + rkyv::check_archived_root::(&m[..]).map_err(|err| { + BallotError::MessageParsing(format!("Validation error: {:?}", err)) + })?; + + let msg: Message2avContent = + archived.deserialize(&mut Infallible).map_err(|err| { + BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) + })?; + + if msg.ballot != self.ballot { + return Err(BallotError::InvalidState( + "Ballot number mismatch in Msg2av".into(), + )); + } + let value_received: V = + bincode::deserialize(&msg.received_value[..]).map_err(|err| { + BallotError::ValueParsing(format!( + "Failed to parse value in Msg2av: {:?}", + err + )) + })?; + + if value_received != self.value_2a.clone().unwrap() { + return Err(BallotError::InvalidState( + "Received different value in Msg2av".into(), + )); + } + + if !self.messages_2av_state.contains_sender(&routing.sender) { + self.messages_2av_state.add_sender( + routing.sender, + self.cfg.party_weights[routing.sender as usize] as u128, + ); + + if self.messages_2av_state.weight > self.cfg.threshold { + self.status = PartyStatus::Passed2av; + } + } + } + ProtocolMessage::Msg2b => { + let archived = + rkyv::check_archived_root::(&m[..]).map_err(|err| { + BallotError::MessageParsing(format!("Validation error: {:?}", err)) + })?; + + let msg: Message2bContent = + archived.deserialize(&mut Infallible).map_err(|err| { + BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) + })?; + + if msg.ballot != self.ballot { + return Err(BallotError::InvalidState( + "Ballot number mismatch in Msg2b".into(), + )); + } + + if self.messages_2av_state.contains_sender(&routing.sender) + && !self.messages_2b_state.contains_sender(&routing.sender) + { + self.messages_2b_state.add_sender( + routing.sender, + self.cfg.party_weights[routing.sender as usize] as u128, + ); + + if self.messages_2b_state.weight > self.cfg.threshold { + self.status = PartyStatus::Passed2b; + } + } + } + } + Ok(()) + } + + /// Executes ballot actions according to the received event. + fn follow_event(&mut self, event: PartyEvent) -> Result<(), BallotError> { + match event { + PartyEvent::Launch1a => { + if self.status != PartyStatus::Launched { + return Err(BallotError::InvalidState( + "Cannot launch 1a, incorrect state".into(), + )); + } + if self.cfg.leader == self.id { + self.msg_out_sender + .send(MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&Message1aContent { + ballot: self.ballot, + }) + .map_err(|_| { + BallotError::MessageParsing("Failed to serialize Msg1a".into()) + })?, + routing: Message1aContent::get_routing(self.id), + }) + .map_err(|_| BallotError::Communication("Failed to send Msg1a".into()))?; + } + } + PartyEvent::Launch1b => { + if self.status != PartyStatus::Passed1a { + return Err(BallotError::InvalidState( + "Cannot launch 1b, incorrect state".into(), + )); + } + self.msg_out_sender + .send(MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&Message1bContent { + ballot: self.ballot, + last_ballot_voted: self.last_ballot_voted, + last_value_voted: self + .last_value_voted + .clone() + .map(|inner_data| { + bincode::serialize(&inner_data).map_err(|_| { + BallotError::ValueParsing( + "Failed to serialize value".into(), + ) + }) + }) + .transpose()?, + }) + .map_err(|_| { + BallotError::MessageParsing("Failed to serialize Msg1b".into()) + })?, + routing: Message1bContent::get_routing(self.id), + }) + .map_err(|_| BallotError::Communication("Failed to send Msg1b".into()))?; + } + PartyEvent::Launch2a => { + if self.status != PartyStatus::Passed1b { + return Err(BallotError::InvalidState( + "Cannot launch 2a, incorrect state".into(), + )); + } + if self.cfg.leader == self.id { + self.msg_out_sender + .send(MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&Message2aContent { + ballot: self.ballot, + value: bincode::serialize(&self.get_value()).map_err(|_| { + BallotError::ValueParsing("Failed to serialize value".into()) + })?, + }) + .map_err(|_| { + BallotError::MessageParsing("Failed to serialize Msg2a".into()) + })?, + routing: Message2aContent::get_routing(self.id), + }) + .map_err(|_| BallotError::Communication("Failed to send Msg2a".into()))?; + } + } + PartyEvent::Launch2av => { + if self.status != PartyStatus::Passed2a { + return Err(BallotError::InvalidState( + "Cannot launch 2av, incorrect state".into(), + )); + } + self.msg_out_sender + .send(MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&Message2avContent { + ballot: self.ballot, + received_value: bincode::serialize(&self.value_2a.clone()).map_err( + |_| BallotError::ValueParsing("Failed to serialize value".into()), + )?, + }) + .map_err(|_| { + BallotError::MessageParsing("Failed to serialize Msg2av".into()) + })?, + routing: Message2avContent::get_routing(self.id), + }) + .map_err(|_| BallotError::Communication("Failed to send Msg2av".into()))?; + } + PartyEvent::Launch2b => { + if self.status != PartyStatus::Passed2av { + return Err(BallotError::InvalidState( + "Cannot launch 2b, incorrect state".into(), + )); + } + self.msg_out_sender + .send(MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&Message2bContent { + ballot: self.ballot, + }) + .map_err(|_| { + BallotError::MessageParsing("Failed to serialize Msg2b".into()) + })?, + routing: Message2bContent::get_routing(self.id), + }) + .map_err(|_| BallotError::Communication("Failed to send Msg2b".into()))?; + } + PartyEvent::Finalize => { + if self.status != PartyStatus::Passed2b { + return Err(BallotError::InvalidState( + "Cannot finalize, incorrect state".into(), + )); + } + self.status = PartyStatus::Finished; + } } + Ok(()) } } -impl> Drop for Party { - fn drop(&mut self) { - // TODO: stop party. +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::HashMap; + + // Mock implementation of Value + #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] + struct MockValue(u64); // Simple mock type wrapping an integer + + impl Value for MockValue {} + + // Mock implementation of ValueSelector + struct MockValueSelector; + + impl ValueSelector for MockValueSelector { + fn verify(&self, _v: &MockValue, _m: &HashMap>) -> bool { + true // For testing, always return true + } + + fn select(&self, _m: &HashMap>) -> MockValue { + MockValue(42) // For testing, always return the same value + } + } + + #[test] + fn test_compute_leader_determinism() { + let party_weights = vec![1, 2, 7]; // Weighted case + let threshold = 7; // example threshold + + // Initialize the configuration once + let config = BPConConfig::new(party_weights.clone(), threshold); + + // Compute the leader multiple times + let leader1 = config.compute_leader().unwrap(); + let leader2 = config.compute_leader().unwrap(); + let leader3 = config.compute_leader().unwrap(); + + // All leaders should be the same due to deterministic seed + assert_eq!( + leader1, leader2, + "Leaders should be consistent on repeated calls" + ); + assert_eq!( + leader2, leader3, + "Leaders should be consistent on repeated calls" + ); + } + + #[test] + #[should_panic] + fn test_compute_leader_zero_weights() { + let party_weights = vec![0, 0, 0]; + let threshold = 1; // example threshold + + // Create the config, which will attempt to compute the leader + BPConConfig::new(party_weights, threshold); + } + + #[test] + fn test_update_state_msg1a() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 1, + }; + let mut party = Party::::new(0, cfg, MockValueSelector).0; + party.status = PartyStatus::Launched; + party.ballot = 1; + + let msg = Message1aContent { ballot: 1 }; + let routing = MessageRouting { + sender: 1, + receivers: vec![2, 3], + is_broadcast: false, + msg_type: ProtocolMessage::Msg1a, + }; + + let msg_wire = MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&msg).unwrap(), + routing, + }; + + party + .update_state(msg_wire.content_bytes, msg_wire.routing) + .unwrap(); + assert_eq!(party.status, PartyStatus::Passed1a); + } + + #[test] + fn test_update_state_msg1b() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], // Total weight is 6 + threshold: 4, // Threshold is 4 + leader: 1, + }; + let mut party = Party::::new(0, cfg, MockValueSelector).0; + party.status = PartyStatus::Passed1a; + party.ballot = 1; + + // First, send a 1b message from party 1 (weight 2) + let msg1 = Message1bContent { + ballot: 1, + last_ballot_voted: Some(0), + last_value_voted: bincode::serialize(&MockValue(42)).ok(), + }; + let routing1 = MessageRouting { + sender: 1, // Party 1 sends the message + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg1b, + }; + + let msg_wire1 = MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&msg1).unwrap(), + routing: routing1, + }; + + party + .update_state(msg_wire1.content_bytes, msg_wire1.routing) + .unwrap(); + + // Now, send a 1b message from party 2 (weight 3) + let msg2 = Message1bContent { + ballot: 1, + last_ballot_voted: Some(0), + last_value_voted: bincode::serialize(&MockValue(42)).ok(), + }; + let routing2 = MessageRouting { + sender: 2, // Party 2 sends the message + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg1b, + }; + + let msg_wire2 = MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&msg2).unwrap(), + routing: routing2, + }; + + party + .update_state(msg_wire2.content_bytes, msg_wire2.routing) + .unwrap(); + + // After both messages, the cumulative weight is 2 + 3 = 5, which exceeds the threshold + assert_eq!(party.status, PartyStatus::Passed1b); + } + + #[test] + fn test_update_state_msg2a() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 1, + }; + let mut party = Party::::new(0, cfg, MockValueSelector).0; + party.status = PartyStatus::Passed1b; + party.ballot = 1; + + let msg = Message2aContent { + ballot: 1, + value: bincode::serialize(&MockValue(42)).unwrap(), + }; + let routing = MessageRouting { + sender: 1, + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg2a, + }; + + let msg_wire = MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&msg).unwrap(), + routing, + }; + + party + .update_state(msg_wire.content_bytes, msg_wire.routing) + .unwrap(); + + assert_eq!(party.status, PartyStatus::Passed2a); + } + + #[test] + fn test_update_state_msg2av() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 1, + }; + let mut party = Party::::new(0, cfg, MockValueSelector).0; + party.status = PartyStatus::Passed2a; + party.ballot = 1; + party.value_2a = Some(MockValue(42)); + + // Send first 2av message from party 2 (weight 3) + let msg1 = Message2avContent { + ballot: 1, + received_value: bincode::serialize(&MockValue(42)).unwrap(), + }; + let routing1 = MessageRouting { + sender: 2, + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg2av, + }; + + let msg_wire1 = MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&msg1).unwrap(), + routing: routing1, + }; + + party + .update_state(msg_wire1.content_bytes, msg_wire1.routing) + .unwrap(); + + // Now send a second 2av message from party 1 (weight 2) + let msg2 = Message2avContent { + ballot: 1, + received_value: bincode::serialize(&MockValue(42)).unwrap(), + }; + let routing2 = MessageRouting { + sender: 1, + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg2av, + }; + + let msg_wire2 = MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&msg2).unwrap(), + routing: routing2, + }; + + party + .update_state(msg_wire2.content_bytes, msg_wire2.routing) + .unwrap(); + + // The cumulative weight (3 + 2) should exceed the threshold of 4 + assert_eq!(party.status, PartyStatus::Passed2av); + } + + #[test] + fn test_update_state_msg2b() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 1, + }; + let mut party = Party::::new(0, cfg, MockValueSelector).0; + party.status = PartyStatus::Passed2av; + party.ballot = 1; + + // Simulate that both party 2 and party 1 already sent 2av messages + party.messages_2av_state.add_sender(1, 1); + party.messages_2av_state.add_sender(2, 2); + + // Send first 2b message from party 2 (weight 3) + let msg1 = Message2bContent { ballot: 1 }; + let routing1 = MessageRouting { + sender: 2, + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg2b, + }; + + let msg_wire1 = MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&msg1).unwrap(), + routing: routing1, + }; + + party + .update_state(msg_wire1.content_bytes, msg_wire1.routing) + .unwrap(); + + // Print the current state and weight + println!( + "After first Msg2b: Status = {:?}, 2b Weight = {}", + party.status, party.messages_2b_state.weight + ); + + // Now send a second 2b message from party 1 (weight 2) + let msg2 = Message2bContent { ballot: 1 }; + let routing2 = MessageRouting { + sender: 1, + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg2b, + }; + + let msg_wire2 = MessagePacket { + content_bytes: rkyv::to_bytes::<_, 256>(&msg2).unwrap(), + routing: routing2, + }; + + party + .update_state(msg_wire2.content_bytes, msg_wire2.routing) + .unwrap(); + + // Print the current state and weight + println!( + "After second Msg2b: Status = {:?}, 2b Weight = {}", + party.status, party.messages_2b_state.weight + ); + + // The cumulative weight (3 + 2) should exceed the threshold of 4 + assert_eq!(party.status, PartyStatus::Passed2b); + } + + #[test] + fn test_follow_event_launch1a() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 0, + }; + let (mut party, _msg_out_receiver, _msg_in_sender) = + Party::::new(0, cfg, MockValueSelector); + + party.status = PartyStatus::Launched; + party.ballot = 1; + + party + .follow_event(PartyEvent::Launch1a) + .expect("Failed to follow Launch1a event"); + + // If the party is the leader and in the Launched state, the event should trigger a message. + assert_eq!(party.status, PartyStatus::Launched); // Status remains Launched, as no state change expected here + } + + #[test] + fn test_ballot_reset_after_failure() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 0, + }; + let (mut party, _, _) = + Party::::new(0, cfg, MockValueSelector); + + party.status = PartyStatus::Failed; + party.ballot = 1; + + party.prepare_next_ballot(); + + // Check that state has been reset + assert_eq!(party.status, PartyStatus::Launched); + assert_eq!(party.ballot, 2); // Ballot number should have incremented + assert!(party.parties_voted_before.is_empty()); + assert_eq!(party.messages_1b_weight, 0); + assert!(party.messages_2av_state.senders.is_empty()); + assert_eq!(party.messages_2av_state.weight, 0); + assert!(party.messages_2b_state.senders.is_empty()); + assert_eq!(party.messages_2b_state.weight, 0); } -} \ No newline at end of file + + #[test] + fn test_follow_event_communication_failure() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 0, + }; + let (mut party, msg_out_receiver, _) = + Party::::new(0, cfg, MockValueSelector); + + party.status = PartyStatus::Launched; + party.ballot = 1; + + drop(msg_out_receiver); // Drop the receiver to simulate a communication failure + + let result = party.follow_event(PartyEvent::Launch1a); + + match result { + Err(BallotError::Communication(err_msg)) => { + assert_eq!( + err_msg, "Failed to send Msg1a", + "Expected specific communication error message" + ); + } + _ => panic!("Expected BallotError::Communication, got {:?}", result), + } + } +}