From ae28f96d62def7bdc3c7543241698ca980ad226e Mon Sep 17 00:00:00 2001 From: olegfomenko Date: Fri, 2 Aug 2024 20:06:16 +0300 Subject: [PATCH 01/21] adding structure for ballot --- Cargo.toml | 1 + src/message.rs | 45 ++++++------- src/message/msg1a.rs | 29 --------- src/message/msg1b.rs | 29 --------- src/message/msg1c.rs | 29 --------- src/message/msg2a.rs | 29 --------- src/message/msg2av.rs | 29 --------- src/message/msg2b.rs | 29 --------- src/party.rs | 145 ++++++++++++++++++++++++++++++++++-------- 9 files changed, 142 insertions(+), 223 deletions(-) delete mode 100644 src/message/msg1a.rs delete mode 100644 src/message/msg1b.rs delete mode 100644 src/message/msg1c.rs delete mode 100644 src/message/msg2a.rs delete mode 100644 src/message/msg2av.rs delete mode 100644 src/message/msg2b.rs diff --git a/Cargo.toml b/Cargo.toml index 287eb66..e6fc552 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,4 @@ edition = "2021" [dependencies] serde = { version = "1.0.204", features = ["derive"] } +serde_json = "1.0.122" diff --git a/src/message.rs b/src/message.rs index eb7c68e..7a7472a 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,30 +1,9 @@ -//! Definition of the BPCon message trait and enum. - -pub mod msg1a; -pub mod msg1b; -pub mod msg1c; -pub mod msg2a; -pub mod msg2av; -pub mod msg2b; +//! Definition of the BPCon messages. use serde::{Deserialize, Serialize}; -/// Generic communicative unit in ballot. -pub trait Message: Serialize + for<'a> Deserialize<'a> { - /// Which participant this message came from. - fn get_sender_id(&self) -> u64; - /// Where this message should be delivered. - fn get_receivers_id(&self) -> Vec; - /// Indicates whether this message shall be broadcast to other participants. Can be empty if `is_broadcast` is `true` - fn is_broadcast(&self) -> bool; - /// Encode inner message to bytes and receive routing information. - fn msg_routing(&self) -> MessageRouting; - /// Returns the BPCon message type. - fn msg_type(&self) -> ProtocolMessage; -} - /// Message ready for transfer. -pub struct MessageWire{ +pub struct MessageWire { /// Serialized message contents. pub content_bytes: Vec, /// Routing information. @@ -51,4 +30,22 @@ pub enum ProtocolMessage { Msg2a, Msg2av, Msg2b, -} \ No newline at end of file +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Message1aContent {} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Message1bContent {} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Message1cContent {} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Message2aContent {} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Message2avContent {} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Message2bContent {} \ No newline at end of file diff --git a/src/message/msg1a.rs b/src/message/msg1a.rs deleted file mode 100644 index cb9ad9f..0000000 --- a/src/message/msg1a.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Definition of the BPCon messages implementation. - -use serde::{Deserialize, Serialize}; -use crate::message::{Message, MessageRouting, ProtocolMessage}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message1a {} - -impl Message for Message1a { - fn get_sender_id(&self) -> u64 { - todo!() - } - - fn get_receivers_id(&self) -> Vec { - todo!() - } - - fn is_broadcast(&self) -> bool { - todo!() - } - - fn msg_routing(&self) -> MessageRouting { - todo!() - } - - fn msg_type(&self) -> ProtocolMessage { - todo!() - } -} diff --git a/src/message/msg1b.rs b/src/message/msg1b.rs deleted file mode 100644 index 8e9ab23..0000000 --- a/src/message/msg1b.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Definition of the BPCon messages implementation. - -use serde::{Deserialize, Serialize}; -use crate::message::{Message, MessageRouting, ProtocolMessage}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message1b {} - -impl Message for Message1b { - fn get_sender_id(&self) -> u64 { - todo!() - } - - fn get_receivers_id(&self) -> Vec { - todo!() - } - - fn is_broadcast(&self) -> bool { - todo!() - } - - fn msg_routing(&self) -> MessageRouting { - todo!() - } - - fn msg_type(&self) -> ProtocolMessage { - todo!() - } -} diff --git a/src/message/msg1c.rs b/src/message/msg1c.rs deleted file mode 100644 index bb91682..0000000 --- a/src/message/msg1c.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Definition of the BPCon messages implementation. - -use serde::{Deserialize, Serialize}; -use crate::message::{Message, MessageRouting, ProtocolMessage}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message1c {} - -impl Message for Message1c { - fn get_sender_id(&self) -> u64 { - todo!() - } - - fn get_receivers_id(&self) -> Vec { - todo!() - } - - fn is_broadcast(&self) -> bool { - todo!() - } - - fn msg_routing(&self) -> MessageRouting { - todo!() - } - - fn msg_type(&self) -> ProtocolMessage { - todo!() - } -} diff --git a/src/message/msg2a.rs b/src/message/msg2a.rs deleted file mode 100644 index 9a97d74..0000000 --- a/src/message/msg2a.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Definition of the BPCon messages implementation. - -use serde::{Deserialize, Serialize}; -use crate::message::{Message, MessageRouting, ProtocolMessage}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message2a {} - -impl Message for Message2a { - fn get_sender_id(&self) -> u64 { - todo!() - } - - fn get_receivers_id(&self) -> Vec { - todo!() - } - - fn is_broadcast(&self) -> bool { - todo!() - } - - fn msg_routing(&self) -> MessageRouting { - todo!() - } - - fn msg_type(&self) -> ProtocolMessage { - todo!() - } -} diff --git a/src/message/msg2av.rs b/src/message/msg2av.rs deleted file mode 100644 index bd0c5a8..0000000 --- a/src/message/msg2av.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Definition of the BPCon messages implementation. - -use serde::{Deserialize, Serialize}; -use crate::message::{Message, MessageRouting, ProtocolMessage}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message2av {} - -impl Message for Message2av { - fn get_sender_id(&self) -> u64 { - todo!() - } - - fn get_receivers_id(&self) -> Vec { - todo!() - } - - fn is_broadcast(&self) -> bool { - todo!() - } - - fn msg_routing(&self) -> MessageRouting { - todo!() - } - - fn msg_type(&self) -> ProtocolMessage { - todo!() - } -} diff --git a/src/message/msg2b.rs b/src/message/msg2b.rs deleted file mode 100644 index 0c3f9b5..0000000 --- a/src/message/msg2b.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! Definition of the BPCon messages implementation. - -use serde::{Deserialize, Serialize}; -use crate::message::{Message, MessageRouting, ProtocolMessage}; - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message2b {} - -impl Message for Message2b { - fn get_sender_id(&self) -> u64 { - todo!() - } - - fn get_receivers_id(&self) -> Vec { - todo!() - } - - fn is_broadcast(&self) -> bool { - todo!() - } - - fn msg_routing(&self) -> MessageRouting { - todo!() - } - - fn msg_type(&self) -> ProtocolMessage { - todo!() - } -} diff --git a/src/party.rs b/src/party.rs index 4be1131..be5b6e9 100644 --- a/src/party.rs +++ b/src/party.rs @@ -1,15 +1,43 @@ //! Definition of the BPCon participant structure. -use std::sync::mpsc::{Receiver, Sender}; +use std::cmp::PartialEq; +use std::sync::mpsc::{channel, Receiver, Sender}; use crate::{Value, ValueSelector}; use crate::error::BallotError; -use crate::message::{MessageRouting, MessageWire, ProtocolMessage}; +use crate::message::{Message1aContent, Message1bContent, Message1cContent, Message2aContent, Message2avContent, Message2bContent, MessageRouting, MessageWire, ProtocolMessage}; /// BPCon configuration. Includes ballot time bounds, and other stuff. pub struct BallotConfig { // TODO: define config fields. } +/// Party status defines the statuses of the ballot for the particular participant +/// depending on local calculations. +#[derive(PartialEq)] +pub(crate) enum PartyStatus { + None, + Launched, + Passed1a, + Passed1b, + Passed1c, + Passed2a, + Finished, + Failed, + Stopped, +} + +/// Party events is used for the ballot flow control. +#[derive(PartialEq)] +pub(crate) enum PartyEvent { + Launch1a, + Launch1b, + Launch1c, + Launch2a, + Launch2av, + Launch2b, + Finalize, +} + /// Party of the BPCon protocol that executes ballot. /// /// The communication between party and external @@ -28,63 +56,130 @@ pub struct Party> { pub party_ids: Vec, /// Communication queues. - in_receiver: Receiver, - out_sender: Sender, + msg_in_receiver: Receiver, + msg_out_sender: Sender, /// Query to submit result. value_sender: Sender>, + /// Query to receive and send events that run ballot protocol + event_receiver: Receiver, + event_sender: Sender, + /// Ballot config (e.g. ballot time bounds). cfg: BallotConfig, /// Main functional for value selection. value_selector: VS, + /// Status of the ballot execution + status: PartyStatus, + // TODO: define other state fields if needed. } + impl> Party { pub fn new( id: u64, party_ids: Vec, - in_receiver: Receiver, - out_sender: Sender, value_sender: Sender>, cfg: BallotConfig, value_selector: VS, - ) -> Self { - Self { - id, - party_ids, - in_receiver, - out_sender, - value_sender, - cfg, - value_selector, - } + ) -> ( + Self, + Receiver, + Sender + ) { + let (event_sender, event_receiver) = channel(); + let (msg_in_sender, msg_in_receiver) = channel(); + let (msg_out_sender, msg_out_receiver) = channel(); + + ( + Self { + id, + party_ids, + msg_in_receiver, + msg_out_sender, + value_sender, + event_receiver, + event_sender, + cfg, + value_selector, + status: PartyStatus::None, + }, + msg_out_receiver, + msg_in_sender + ) + } + + pub fn is_launched(&self) -> bool { + return !self.is_stopped(); + } + + pub fn is_stopped(&self) -> bool { + return self.status == PartyStatus::Finished || self.status == PartyStatus::Failed || self.status == PartyStatus::Stopped; } /// Start the party. - pub fn start(&mut self) { - // TODO: launch party + pub async fn launch(&mut self) { + while self.is_launched() { + /// Check for new messages + if let Ok(msg_wire) = self.msg_in_receiver.try_recv() { + self.update_state(msg_wire.content_bytes, msg_wire.routing); + } + + /// Check for new events + if let Ok(event) = self.event_receiver.try_recv() { + self.follow_event(event); + } + + // TODO: emit events to run ballot protocol according to the ballot configuration `BallotConfig` + } } /// Update party's state based on message type. fn update_state(&mut self, msg: Vec, routing: MessageRouting) { // TODO: Implement according to protocol rules. match routing.msg_type { - ProtocolMessage::Msg1a => {} - ProtocolMessage::Msg1b => {} - ProtocolMessage::Msg1c => {} - ProtocolMessage::Msg2a => {} - ProtocolMessage::Msg2av => {} - ProtocolMessage::Msg2b => {} + ProtocolMessage::Msg1a => { + if let Ok(msg) = serde_json::from_slice::(msg.as_slice()) {} + } + ProtocolMessage::Msg1b => { + if let Ok(msg) = serde_json::from_slice::(msg.as_slice()) {} + } + ProtocolMessage::Msg1c => { + if let Ok(msg) = serde_json::from_slice::(msg.as_slice()) {} + } + ProtocolMessage::Msg2a => { + if let Ok(msg) = serde_json::from_slice::(msg.as_slice()) {} + } + ProtocolMessage::Msg2av => { + if let Ok(msg) = serde_json::from_slice::(msg.as_slice()) {} + } + ProtocolMessage::Msg2b => { + if let Ok(msg) = serde_json::from_slice::(msg.as_slice()) {} + } + } + } + + /// Executes ballot actions according to the received event. + fn follow_event(&mut self, event: PartyEvent) { + // TODO: Implement according to protocol rules. + match event { + PartyEvent::Launch1a => {} + PartyEvent::Launch1b => {} + PartyEvent::Launch1c => {} + PartyEvent::Launch2a => {} + PartyEvent::Launch2av => {} + PartyEvent::Launch2b => {} + PartyEvent::Finalize => {} } } } impl> Drop for Party { fn drop(&mut self) { - // TODO: stop party. + self.status = PartyStatus::Stopped } } \ No newline at end of file From 6f8779ab9a1fecbef9cd4374af7f66af0c900305 Mon Sep 17 00:00:00 2001 From: olegfomenko Date: Fri, 2 Aug 2024 21:45:13 +0300 Subject: [PATCH 02/21] adding ballot logic --- src/lib.rs | 15 ++- src/message.rs | 81 ++++++++++++-- src/party.rs | 284 +++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 337 insertions(+), 43 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b8fa6a5..8af143f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,18 +1,23 @@ +use std::collections::HashMap; +use serde::{Deserialize, Serialize}; + pub mod message; pub mod party; mod error; /// General trait for value itself. -pub trait Value: Eq {} +pub trait Value: Eq + Serialize + for<'a> Deserialize<'a> + Clone {} /// Trait for value selector and verificator. /// Value selection and verification may depend on different conditions for different values. +/// Note that value selection should follow the rules of BPCon: only safe values can be selected. +/// Party can not vote for different values, even in different ballots. pub trait ValueSelector { - /// Verifies if a value is selected correctly. - fn verify(v: V) -> bool; + /// Verifies if a value is selected correctly. Accepts 2b messages from parties. + fn verify(&self, v: &V, m: &HashMap>>) -> bool; - /// Select value depending on inner conditions. - fn select() -> V; + /// Select value depending on inner conditions. Accepts 2b messages from parties. + fn select(&self, m: &HashMap>>) -> V; // TODO: add other fields to update selector state. } diff --git a/src/message.rs b/src/message.rs index 7a7472a..d6b423b 100644 --- a/src/message.rs +++ b/src/message.rs @@ -26,26 +26,91 @@ pub struct MessageRouting { pub enum ProtocolMessage { Msg1a, Msg1b, - Msg1c, Msg2a, Msg2av, Msg2b, } #[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message1aContent {} +pub struct Message1aContent { + pub ballot: u64, +} #[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message1bContent {} +pub struct Message1bContent { + pub ballot: u64, + pub last_ballot_voted: Option, + pub last_value_voted: Option>, +} #[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message1cContent {} +pub struct Message2aContent { + pub ballot: u64, + pub value: Vec, +} #[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message2aContent {} +pub struct Message2avContent { + pub ballot: u64, + pub received_value: Vec, +} #[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message2avContent {} +pub struct Message2bContent { + pub ballot: u64, +} -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct Message2bContent {} \ No newline at end of file +impl Message1aContent { + pub fn get_routing(id: u64) -> MessageRouting { + MessageRouting { + sender: id, + receivers: vec![], + is_broadcast: true, + msg_type: ProtocolMessage::Msg1a, + } + } +} + +impl Message1bContent { + pub fn get_routing(id: u64) -> MessageRouting { + MessageRouting { + sender: id, + receivers: vec![], + is_broadcast: true, + msg_type: ProtocolMessage::Msg1b, + } + } +} + +impl Message2aContent { + pub fn get_routing(id: u64) -> MessageRouting { + MessageRouting { + sender: id, + receivers: vec![], + is_broadcast: true, + msg_type: ProtocolMessage::Msg2a, + } + } +} + +impl Message2avContent { + pub fn get_routing(id: u64) -> MessageRouting { + MessageRouting { + sender: id, + receivers: vec![], + is_broadcast: true, + msg_type: ProtocolMessage::Msg2av, + } + } +} + +impl Message2bContent { + pub fn get_routing(id: u64) -> MessageRouting { + MessageRouting { + sender: id, + receivers: vec![], + is_broadcast: true, + msg_type: ProtocolMessage::Msg2b, + } + } +} \ No newline at end of file diff --git a/src/party.rs b/src/party.rs index be5b6e9..5eb5c98 100644 --- a/src/party.rs +++ b/src/party.rs @@ -1,14 +1,21 @@ //! Definition of the BPCon participant structure. use std::cmp::PartialEq; +use std::collections::{HashMap, HashSet}; use std::sync::mpsc::{channel, Receiver, Sender}; use crate::{Value, ValueSelector}; use crate::error::BallotError; -use crate::message::{Message1aContent, Message1bContent, Message1cContent, Message2aContent, Message2avContent, Message2bContent, MessageRouting, MessageWire, ProtocolMessage}; +use crate::message::{Message1aContent, Message1bContent, Message2aContent, Message2avContent, Message2bContent, MessageRouting, MessageWire, ProtocolMessage}; /// BPCon configuration. Includes ballot time bounds, and other stuff. -pub struct BallotConfig { - // TODO: define config fields. +pub struct BPConConfig { + /// Parties weights: `party_weights[i]` corresponds to the i-th party weight + pub party_weights: Vec, + + /// Threshold weight to define BFT quorum: should be > 66.6% + pub threshold: u128, + + // TODO: define other config fields. } /// Party status defines the statuses of the ballot for the particular participant @@ -19,8 +26,9 @@ pub(crate) enum PartyStatus { Launched, Passed1a, Passed1b, - Passed1c, Passed2a, + Passed2av, + Passed2b, Finished, Failed, Stopped, @@ -31,7 +39,6 @@ pub(crate) enum PartyStatus { pub(crate) enum PartyEvent { Launch1a, Launch1b, - Launch1c, Launch2a, Launch2av, Launch2b, @@ -52,8 +59,6 @@ pub(crate) enum PartyEvent { pub struct Party> { /// This party's identifier. pub id: u64, - /// Other ballot parties' ids. - pub party_ids: Vec, /// Communication queues. msg_in_receiver: Receiver, @@ -66,8 +71,8 @@ pub struct Party> { event_receiver: Receiver, event_sender: Sender, - /// Ballot config (e.g. ballot time bounds). - cfg: BallotConfig, + /// BPCon config (e.g. ballot time bounds, parties weights, etc.). + cfg: BPConConfig, /// Main functional for value selection. value_selector: VS, @@ -75,16 +80,45 @@ pub struct Party> { /// Status of the ballot execution status: PartyStatus, + /// Current ballot number + ballot: u64, + + /// Last ballot where party submitted 2b message + last_ballot_voted: Option, + + /// Last value for which party submitted 2b message + last_value_voted: Option>, + // TODO: define other state fields if needed. + + + /// 1b round state + /// + parties_voted_before: HashMap>>, // id <-> value + messages_1b_weight: u128, + + /// 2a round state + /// + value_2a: Option, + + /// 2av round state + /// + messages_2av_senders: HashSet, + messages_2av_weight: u128, + + /// 2b round state + /// + messages_2b_senders: HashSet, + messages_2b_weight: u128, + } impl> Party { pub fn new( id: u64, - party_ids: Vec, value_sender: Sender>, - cfg: BallotConfig, + cfg: BPConConfig, value_selector: VS, ) -> ( Self, @@ -98,7 +132,6 @@ impl> Party { ( Self { id, - party_ids, msg_in_receiver, msg_out_sender, value_sender, @@ -107,6 +140,16 @@ impl> Party { cfg, value_selector, status: PartyStatus::None, + ballot: 0, + last_ballot_voted: None, + last_value_voted: None, + parties_voted_before: HashMap::new(), + messages_1b_weight: 0, + value_2a: None, + messages_2av_senders: HashSet::new(), + messages_2av_weight: 0, + messages_2b_senders: HashSet::new(), + messages_2b_weight: 0, }, msg_out_receiver, msg_in_sender @@ -121,8 +164,20 @@ impl> Party { return self.status == PartyStatus::Finished || self.status == PartyStatus::Failed || self.status == PartyStatus::Stopped; } - /// Start the party. - pub async fn launch(&mut self) { + pub fn get_leader(&self) -> u64 { + // TODO: implement weight random based on some conditions + todo!() + } + + pub fn get_value(&self) -> V { + return self.value_selector.select(&self.parties_voted_before); + } + + /// Start the next ballot. It's expected from the external system to re-run ballot protocol in + /// case of failed ballot. + pub async fn launch_ballot(&mut self) { + self.prepare_next_ballot(); + while self.is_launched() { /// Check for new messages if let Ok(msg_wire) = self.msg_in_receiver.try_recv() { @@ -138,27 +193,123 @@ impl> Party { } } + + /// Prepare state before running a ballot + fn prepare_next_ballot(&mut self) { + self.status = PartyStatus::None; + self.ballot = self.ballot + 1; + + // Clean state + self.parties_voted_before = HashMap::new(); + self.messages_1b_weight = 0; + self.value_2a = None; + self.messages_2av_senders = HashSet::new(); + self.messages_2av_weight = 0; + self.messages_2b_senders = HashSet::new(); + self.messages_2b_weight = 0; + + // Cleaning channels + while let Ok(_) = self.event_receiver.try_recv() {} + while let Ok(_) = self.msg_in_receiver.try_recv() {} + + self.status = PartyStatus::Launched; + } + /// Update party's state based on message type. - fn update_state(&mut self, msg: Vec, routing: MessageRouting) { + fn update_state(&mut self, m: Vec, routing: MessageRouting) { // TODO: Implement according to protocol rules. match routing.msg_type { ProtocolMessage::Msg1a => { - if let Ok(msg) = serde_json::from_slice::(msg.as_slice()) {} + if let Ok(msg) = serde_json::from_slice::(m.as_slice()) { + if msg.ballot != self.ballot { + return; + } + + if routing.sender != self.get_leader() { + return; + } + + self.status = PartyStatus::Passed1a; + } } ProtocolMessage::Msg1b => { - if let Ok(msg) = serde_json::from_slice::(msg.as_slice()) {} - } - ProtocolMessage::Msg1c => { - if let Ok(msg) = serde_json::from_slice::(msg.as_slice()) {} + if let Ok(msg) = serde_json::from_slice::(m.as_slice()) { + if msg.ballot != self.ballot { + return; + } + + if let Some(last_ballot_voted) = msg.last_ballot_voted { + if last_ballot_voted >= self.ballot { + return; + } + } + + if !self.parties_voted_before.contains_key(&routing.sender) { + self.parties_voted_before.insert(routing.sender, msg.last_value_voted); + self.messages_1b_weight += self.cfg.party_weights[routing.sender as usize] as u128; + + if self.messages_1b_weight > self.cfg.threshold { + self.status = PartyStatus::Passed1b + } + } + } } ProtocolMessage::Msg2a => { - if let Ok(msg) = serde_json::from_slice::(msg.as_slice()) {} + if let Ok(msg) = serde_json::from_slice::(m.as_slice()) { + if msg.ballot != self.ballot { + return; + } + + if routing.sender != self.get_leader() { + return; + } + + if let Ok(value_received) = serde_json::from_slice::(msg.value.as_slice()) { + if self.value_selector.verify(&value_received, &self.parties_voted_before) { + self.status = PartyStatus::Passed2a; + self.value_2a = Some(value_received); + } + } + } } ProtocolMessage::Msg2av => { - if let Ok(msg) = serde_json::from_slice::(msg.as_slice()) {} + if let Ok(msg) = serde_json::from_slice::(m.as_slice()) { + if msg.ballot != self.ballot { + return; + } + + if let Ok(value_received) = serde_json::from_slice::(msg.received_value.as_slice()) { + if value_received != self.value_2a.clone().unwrap() { + return; + } + } + + if !self.messages_2av_senders.contains(&routing.sender) { + self.messages_2av_senders.insert(routing.sender); + self.messages_2av_weight += self.cfg.party_weights[routing.sender as usize] as u128; + + if self.messages_2av_weight > self.cfg.threshold { + self.status = PartyStatus::Passed2av + } + } + } } ProtocolMessage::Msg2b => { - if let Ok(msg) = serde_json::from_slice::(msg.as_slice()) {} + if let Ok(msg) = serde_json::from_slice::(m.as_slice()) { + if msg.ballot != self.ballot { + return; + } + + // Only those who submitted 2av + if self.messages_2av_senders.contains(&routing.sender) && !self.messages_2b_senders.contains(&routing.sender) { + self.messages_2b_senders.insert(routing.sender); + self.messages_2b_weight += self.cfg.party_weights[routing.sender as usize] as u128; + + if self.messages_2b_weight > self.cfg.threshold { + self.status = PartyStatus::Passed2b + } + } + } } } } @@ -167,13 +318,86 @@ impl> Party { fn follow_event(&mut self, event: PartyEvent) { // TODO: Implement according to protocol rules. match event { - PartyEvent::Launch1a => {} - PartyEvent::Launch1b => {} - PartyEvent::Launch1c => {} - PartyEvent::Launch2a => {} - PartyEvent::Launch2av => {} - PartyEvent::Launch2b => {} - PartyEvent::Finalize => {} + PartyEvent::Launch1a => { + if self.status != PartyStatus::Launched { + self.status = PartyStatus::Failed; + return; + } + + if self.get_leader() == self.id { + self.msg_out_sender.send(MessageWire { + content_bytes: serde_json::to_vec(&Message1aContent { ballot: self.ballot }).unwrap(), + routing: Message1aContent::get_routing(self.id), + }).unwrap(); + } + } + PartyEvent::Launch1b => { + if self.status != PartyStatus::Passed1a { + self.status = PartyStatus::Failed; + return; + } + + self.msg_out_sender.send(MessageWire { + content_bytes: serde_json::to_vec(&Message1bContent { + ballot: self.ballot, + last_ballot_voted: self.last_ballot_voted.clone(), + last_value_voted: self.last_value_voted.clone(), + }).unwrap(), + routing: Message1bContent::get_routing(self.id), + }).unwrap(); + } + PartyEvent::Launch2a => { + if self.status != PartyStatus::Passed1b { + self.status = PartyStatus::Failed; + return; + } + + if self.get_leader() == self.id { + self.msg_out_sender.send(MessageWire { + content_bytes: serde_json::to_vec(&Message2aContent { + ballot: self.ballot, + value: serde_json::to_vec(&self.get_value()).unwrap(), + }).unwrap(), + routing: Message2aContent::get_routing(self.id), + }).unwrap(); + } + } + PartyEvent::Launch2av => { + if self.status != PartyStatus::Passed2a { + self.status = PartyStatus::Failed; + return; + } + + self.msg_out_sender.send(MessageWire { + content_bytes: serde_json::to_vec(&Message2avContent { + ballot: self.ballot, + received_value: serde_json::to_vec(&self.value_2a.clone().unwrap()).unwrap(), + }).unwrap(), + routing: Message2avContent::get_routing(self.id), + }).unwrap(); + } + PartyEvent::Launch2b => { + if self.status != PartyStatus::Passed2av { + self.status = PartyStatus::Failed; + return; + } + + self.msg_out_sender.send(MessageWire { + content_bytes: serde_json::to_vec(&Message2bContent { + ballot: self.ballot, + }).unwrap(), + routing: Message2bContent::get_routing(self.id), + }).unwrap(); + } + PartyEvent::Finalize => { + if self.status != PartyStatus::Passed2av { + self.status = PartyStatus::Failed; + return; + } + + self.value_sender.send(Ok(self.value_2a.clone().unwrap())).unwrap(); + self.status = PartyStatus::Finished; + } } } } From 08afdaf7b91dc2b2983261cc2d752c72cfd2e6fd Mon Sep 17 00:00:00 2001 From: olegfomenko Date: Sat, 3 Aug 2024 04:31:39 +0300 Subject: [PATCH 03/21] adding stop method --- src/party.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/party.rs b/src/party.rs index 5eb5c98..d7b49f4 100644 --- a/src/party.rs +++ b/src/party.rs @@ -43,6 +43,7 @@ pub(crate) enum PartyEvent { Launch2av, Launch2b, Finalize, + Stop, } /// Party of the BPCon protocol that executes ballot. @@ -89,8 +90,7 @@ pub struct Party> { /// Last value for which party submitted 2b message last_value_voted: Option>, - // TODO: define other state fields if needed. - + /// Local round fields /// 1b round state /// @@ -193,6 +193,10 @@ impl> Party { } } + /// Stop ballot protocol. + pub fn stop_ballot(&mut self) { + self.event_sender.send(PartyEvent::Stop).unwrap(); + } /// Prepare state before running a ballot fn prepare_next_ballot(&mut self) { @@ -398,12 +402,9 @@ impl> Party { self.value_sender.send(Ok(self.value_2a.clone().unwrap())).unwrap(); self.status = PartyStatus::Finished; } + PartyEvent::Stop => { + self.status = PartyStatus::Stopped; + } } } } - -impl> Drop for Party { - fn drop(&mut self) { - self.status = PartyStatus::Stopped - } -} \ No newline at end of file From 29958e7f74e9b6fd7839a988a7a809c7e7a15c9d Mon Sep 17 00:00:00 2001 From: olegfomenko Date: Sat, 3 Aug 2024 04:33:31 +0300 Subject: [PATCH 04/21] fixed party new method --- src/party.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/party.rs b/src/party.rs index d7b49f4..19354c1 100644 --- a/src/party.rs +++ b/src/party.rs @@ -117,17 +117,18 @@ pub struct Party> { impl> Party { pub fn new( id: u64, - value_sender: Sender>, cfg: BPConConfig, value_selector: VS, ) -> ( Self, Receiver, - Sender + Sender, + Receiver>, ) { let (event_sender, event_receiver) = channel(); let (msg_in_sender, msg_in_receiver) = channel(); let (msg_out_sender, msg_out_receiver) = channel(); + let (value_sender, value_receiver) = channel(); ( Self { @@ -152,7 +153,8 @@ impl> Party { messages_2b_weight: 0, }, msg_out_receiver, - msg_in_sender + msg_in_sender, + value_receiver, ) } From dc9b348eb55eaf4d8442449890e8bf4996eb0e4a Mon Sep 17 00:00:00 2001 From: olegfomenko Date: Sat, 3 Aug 2024 04:51:41 +0300 Subject: [PATCH 05/21] removing stop, removing separate channel for receiving value --- src/party.rs | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/src/party.rs b/src/party.rs index 19354c1..43b2c8e 100644 --- a/src/party.rs +++ b/src/party.rs @@ -65,9 +65,6 @@ pub struct Party> { msg_in_receiver: Receiver, msg_out_sender: Sender, - /// Query to submit result. - value_sender: Sender>, - /// Query to receive and send events that run ballot protocol event_receiver: Receiver, event_sender: Sender, @@ -110,7 +107,6 @@ pub struct Party> { /// messages_2b_senders: HashSet, messages_2b_weight: u128, - } @@ -123,19 +119,16 @@ impl> Party { Self, Receiver, Sender, - Receiver>, ) { let (event_sender, event_receiver) = channel(); let (msg_in_sender, msg_in_receiver) = channel(); let (msg_out_sender, msg_out_receiver) = channel(); - let (value_sender, value_receiver) = channel(); ( Self { id, msg_in_receiver, msg_out_sender, - value_sender, event_receiver, event_sender, cfg, @@ -154,10 +147,13 @@ impl> Party { }, msg_out_receiver, msg_in_sender, - value_receiver, ) } + pub fn ballot(&self) -> u64 { + return self.ballot; + } + pub fn is_launched(&self) -> bool { return !self.is_stopped(); } @@ -166,12 +162,21 @@ impl> Party { return self.status == PartyStatus::Finished || self.status == PartyStatus::Failed || self.status == PartyStatus::Stopped; } - pub fn get_leader(&self) -> u64 { + pub fn get_value_selected(&self) -> Option { + // Only `Finished` status means reached BFT agreement + if self.status == PartyStatus::Finished { + return self.value_2a.clone(); + } + + return None; + } + + fn get_leader(&self) -> u64 { // TODO: implement weight random based on some conditions todo!() } - pub fn get_value(&self) -> V { + fn get_value(&self) -> V { return self.value_selector.select(&self.parties_voted_before); } @@ -195,11 +200,6 @@ impl> Party { } } - /// Stop ballot protocol. - pub fn stop_ballot(&mut self) { - self.event_sender.send(PartyEvent::Stop).unwrap(); - } - /// Prepare state before running a ballot fn prepare_next_ballot(&mut self) { self.status = PartyStatus::None; @@ -401,7 +401,6 @@ impl> Party { return; } - self.value_sender.send(Ok(self.value_2a.clone().unwrap())).unwrap(); self.status = PartyStatus::Finished; } PartyEvent::Stop => { From 1b64ee78850604e40169291a7c9ea0dcbd609996 Mon Sep 17 00:00:00 2001 From: olegfomenko Date: Sat, 3 Aug 2024 04:56:31 +0300 Subject: [PATCH 06/21] fixed comments, fixed warnings, removing stop event --- src/party.rs | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/party.rs b/src/party.rs index 43b2c8e..3b379b8 100644 --- a/src/party.rs +++ b/src/party.rs @@ -4,7 +4,6 @@ use std::cmp::PartialEq; use std::collections::{HashMap, HashSet}; use std::sync::mpsc::{channel, Receiver, Sender}; use crate::{Value, ValueSelector}; -use crate::error::BallotError; use crate::message::{Message1aContent, Message1bContent, Message2aContent, Message2avContent, Message2bContent, MessageRouting, MessageWire, ProtocolMessage}; /// BPCon configuration. Includes ballot time bounds, and other stuff. @@ -12,7 +11,7 @@ pub struct BPConConfig { /// Parties weights: `party_weights[i]` corresponds to the i-th party weight pub party_weights: Vec, - /// Threshold weight to define BFT quorum: should be > 66.6% + /// Threshold weight to define BFT quorum: should be > 2/3 of total weight pub threshold: u128, // TODO: define other config fields. @@ -31,7 +30,6 @@ pub(crate) enum PartyStatus { Passed2b, Finished, Failed, - Stopped, } /// Party events is used for the ballot flow control. @@ -43,7 +41,6 @@ pub(crate) enum PartyEvent { Launch2av, Launch2b, Finalize, - Stop, } /// Party of the BPCon protocol that executes ballot. @@ -159,7 +156,7 @@ impl> Party { } pub fn is_stopped(&self) -> bool { - return self.status == PartyStatus::Finished || self.status == PartyStatus::Failed || self.status == PartyStatus::Stopped; + return self.status == PartyStatus::Finished || self.status == PartyStatus::Failed; } pub fn get_value_selected(&self) -> Option { @@ -182,22 +179,30 @@ impl> Party { /// Start the next ballot. It's expected from the external system to re-run ballot protocol in /// case of failed ballot. - pub async fn launch_ballot(&mut self) { + pub async fn launch_ballot(&mut self) -> Option { self.prepare_next_ballot(); while self.is_launched() { - /// Check for new messages + // Check for new messages if let Ok(msg_wire) = self.msg_in_receiver.try_recv() { self.update_state(msg_wire.content_bytes, msg_wire.routing); } - /// Check for new events + // Check for new events if let Ok(event) = self.event_receiver.try_recv() { self.follow_event(event); } // TODO: emit events to run ballot protocol according to the ballot configuration `BallotConfig` + self.event_sender.send(PartyEvent::Launch1a).unwrap(); + self.event_sender.send(PartyEvent::Launch1b).unwrap(); + self.event_sender.send(PartyEvent::Launch2a).unwrap(); + self.event_sender.send(PartyEvent::Launch2av).unwrap(); + self.event_sender.send(PartyEvent::Launch2b).unwrap(); + self.event_sender.send(PartyEvent::Finalize).unwrap(); } + + return self.get_value_selected(); } /// Prepare state before running a ballot @@ -403,9 +408,6 @@ impl> Party { self.status = PartyStatus::Finished; } - PartyEvent::Stop => { - self.status = PartyStatus::Stopped; - } } } } From e3aa7387328452ffdc4cb6848a1e6e9651e5092e Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Mon, 5 Aug 2024 15:52:08 +0300 Subject: [PATCH 07/21] chore: added README.md --- README.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..6d8f44a --- /dev/null +++ b/README.md @@ -0,0 +1,27 @@ +# BPCon Rust Library + +[![License: GPL v3](https://img.shields.io/badge/License-GPLv3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0) + +This is a generic rust implementation of the `BPCon` consensus mechanism. + +## Library Structure + +### src/party.rs + +Main entity in this implementation is `Party` - it represents member of the consensus. + +External system shall create desired amount of parties. + +We have 2 communication channels - one for sending `MessageWire` - encoded in bytes message and routing information, +and the other for pitching consensus events - this allows for external system to impose custom limitations and rules +regarding runway. + +### src/message.rs + +Definitions of the general message struct, routing information and type-specific contents. + +### src/lib.rs + +Here we present a trait for the value on which consensus is being conducted. Additionally, there is a trait for +defining custom value selection rules, called `ValueSelector`. + From e6bbfbf0659a4614c21054070a2ffc8cf431aaf5 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Mon, 5 Aug 2024 15:55:11 +0300 Subject: [PATCH 08/21] chore: cargo fmt --- src/error.rs | 2 +- src/lib.rs | 4 +- src/message.rs | 2 +- src/party.rs | 122 ++++++++++++++++++++++++++++++------------------- 4 files changed, 78 insertions(+), 52 deletions(-) diff --git a/src/error.rs b/src/error.rs index 032fdc9..cb73d7b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,4 @@ //! Definition of the BPCon errors. pub enum BallotError { // TODO: define errors. -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index 8af143f..63a7bd2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,9 @@ -use std::collections::HashMap; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +mod error; pub mod message; pub mod party; -mod error; /// General trait for value itself. pub trait Value: Eq + Serialize + for<'a> Deserialize<'a> + Clone {} diff --git a/src/message.rs b/src/message.rs index d6b423b..9e9eb3c 100644 --- a/src/message.rs +++ b/src/message.rs @@ -113,4 +113,4 @@ impl Message2bContent { msg_type: ProtocolMessage::Msg2b, } } -} \ No newline at end of file +} diff --git a/src/party.rs b/src/party.rs index 3b379b8..c5b5afe 100644 --- a/src/party.rs +++ b/src/party.rs @@ -1,10 +1,13 @@ //! Definition of the BPCon participant structure. +use crate::message::{ + Message1aContent, Message1bContent, Message2aContent, Message2avContent, Message2bContent, + MessageRouting, MessageWire, ProtocolMessage, +}; +use crate::{Value, ValueSelector}; use std::cmp::PartialEq; use std::collections::{HashMap, HashSet}; use std::sync::mpsc::{channel, Receiver, Sender}; -use crate::{Value, ValueSelector}; -use crate::message::{Message1aContent, Message1bContent, Message2aContent, Message2avContent, Message2bContent, MessageRouting, MessageWire, ProtocolMessage}; /// BPCon configuration. Includes ballot time bounds, and other stuff. pub struct BPConConfig { @@ -13,7 +16,6 @@ pub struct BPConConfig { /// Threshold weight to define BFT quorum: should be > 2/3 of total weight pub threshold: u128, - // TODO: define other config fields. } @@ -106,17 +108,12 @@ pub struct Party> { messages_2b_weight: u128, } - impl> Party { pub fn new( id: u64, cfg: BPConConfig, value_selector: VS, - ) -> ( - Self, - Receiver, - Sender, - ) { + ) -> (Self, Receiver, Sender) { let (event_sender, event_receiver) = channel(); let (msg_in_sender, msg_in_receiver) = channel(); let (msg_out_sender, msg_out_receiver) = channel(); @@ -256,8 +253,10 @@ impl> Party { } if !self.parties_voted_before.contains_key(&routing.sender) { - self.parties_voted_before.insert(routing.sender, msg.last_value_voted); - self.messages_1b_weight += self.cfg.party_weights[routing.sender as usize] as u128; + self.parties_voted_before + .insert(routing.sender, msg.last_value_voted); + self.messages_1b_weight += + self.cfg.party_weights[routing.sender as usize] as u128; if self.messages_1b_weight > self.cfg.threshold { self.status = PartyStatus::Passed1b @@ -276,7 +275,10 @@ impl> Party { } if let Ok(value_received) = serde_json::from_slice::(msg.value.as_slice()) { - if self.value_selector.verify(&value_received, &self.parties_voted_before) { + if self + .value_selector + .verify(&value_received, &self.parties_voted_before) + { self.status = PartyStatus::Passed2a; self.value_2a = Some(value_received); } @@ -289,7 +291,9 @@ impl> Party { return; } - if let Ok(value_received) = serde_json::from_slice::(msg.received_value.as_slice()) { + if let Ok(value_received) = + serde_json::from_slice::(msg.received_value.as_slice()) + { if value_received != self.value_2a.clone().unwrap() { return; } @@ -297,7 +301,8 @@ impl> Party { if !self.messages_2av_senders.contains(&routing.sender) { self.messages_2av_senders.insert(routing.sender); - self.messages_2av_weight += self.cfg.party_weights[routing.sender as usize] as u128; + self.messages_2av_weight += + self.cfg.party_weights[routing.sender as usize] as u128; if self.messages_2av_weight > self.cfg.threshold { self.status = PartyStatus::Passed2av @@ -312,9 +317,12 @@ impl> Party { } // Only those who submitted 2av - if self.messages_2av_senders.contains(&routing.sender) && !self.messages_2b_senders.contains(&routing.sender) { + if self.messages_2av_senders.contains(&routing.sender) + && !self.messages_2b_senders.contains(&routing.sender) + { self.messages_2b_senders.insert(routing.sender); - self.messages_2b_weight += self.cfg.party_weights[routing.sender as usize] as u128; + self.messages_2b_weight += + self.cfg.party_weights[routing.sender as usize] as u128; if self.messages_2b_weight > self.cfg.threshold { self.status = PartyStatus::Passed2b @@ -336,10 +344,15 @@ impl> Party { } if self.get_leader() == self.id { - self.msg_out_sender.send(MessageWire { - content_bytes: serde_json::to_vec(&Message1aContent { ballot: self.ballot }).unwrap(), - routing: Message1aContent::get_routing(self.id), - }).unwrap(); + self.msg_out_sender + .send(MessageWire { + content_bytes: serde_json::to_vec(&Message1aContent { + ballot: self.ballot, + }) + .unwrap(), + routing: Message1aContent::get_routing(self.id), + }) + .unwrap(); } } PartyEvent::Launch1b => { @@ -348,14 +361,17 @@ impl> Party { return; } - self.msg_out_sender.send(MessageWire { - content_bytes: serde_json::to_vec(&Message1bContent { - ballot: self.ballot, - last_ballot_voted: self.last_ballot_voted.clone(), - last_value_voted: self.last_value_voted.clone(), - }).unwrap(), - routing: Message1bContent::get_routing(self.id), - }).unwrap(); + self.msg_out_sender + .send(MessageWire { + content_bytes: serde_json::to_vec(&Message1bContent { + ballot: self.ballot, + last_ballot_voted: self.last_ballot_voted.clone(), + last_value_voted: self.last_value_voted.clone(), + }) + .unwrap(), + routing: Message1bContent::get_routing(self.id), + }) + .unwrap(); } PartyEvent::Launch2a => { if self.status != PartyStatus::Passed1b { @@ -364,13 +380,16 @@ impl> Party { } if self.get_leader() == self.id { - self.msg_out_sender.send(MessageWire { - content_bytes: serde_json::to_vec(&Message2aContent { - ballot: self.ballot, - value: serde_json::to_vec(&self.get_value()).unwrap(), - }).unwrap(), - routing: Message2aContent::get_routing(self.id), - }).unwrap(); + self.msg_out_sender + .send(MessageWire { + content_bytes: serde_json::to_vec(&Message2aContent { + ballot: self.ballot, + value: serde_json::to_vec(&self.get_value()).unwrap(), + }) + .unwrap(), + routing: Message2aContent::get_routing(self.id), + }) + .unwrap(); } } PartyEvent::Launch2av => { @@ -379,13 +398,17 @@ impl> Party { return; } - self.msg_out_sender.send(MessageWire { - content_bytes: serde_json::to_vec(&Message2avContent { - ballot: self.ballot, - received_value: serde_json::to_vec(&self.value_2a.clone().unwrap()).unwrap(), - }).unwrap(), - routing: Message2avContent::get_routing(self.id), - }).unwrap(); + self.msg_out_sender + .send(MessageWire { + content_bytes: serde_json::to_vec(&Message2avContent { + ballot: self.ballot, + received_value: serde_json::to_vec(&self.value_2a.clone().unwrap()) + .unwrap(), + }) + .unwrap(), + routing: Message2avContent::get_routing(self.id), + }) + .unwrap(); } PartyEvent::Launch2b => { if self.status != PartyStatus::Passed2av { @@ -393,12 +416,15 @@ impl> Party { return; } - self.msg_out_sender.send(MessageWire { - content_bytes: serde_json::to_vec(&Message2bContent { - ballot: self.ballot, - }).unwrap(), - routing: Message2bContent::get_routing(self.id), - }).unwrap(); + self.msg_out_sender + .send(MessageWire { + content_bytes: serde_json::to_vec(&Message2bContent { + ballot: self.ballot, + }) + .unwrap(), + routing: Message2bContent::get_routing(self.id), + }) + .unwrap(); } PartyEvent::Finalize => { if self.status != PartyStatus::Passed2av { From c03e610e8dbdf56b2c8f72b93ef7655be17220f7 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Tue, 6 Aug 2024 12:26:13 +0300 Subject: [PATCH 09/21] chore: add rust ci --- .github/workflows/rust.yml | 135 +++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 .github/workflows/rust.yml diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..f8d9382 --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,135 @@ +name: Rust CI + +on: + pull_request: + branches: + - main + push: + branches: + - main + workflow_dispatch: + +permissions: + contents: read + +jobs: + check: + name: Check + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Cache Cargo registry + uses: actions/cache@v4 + with: + path: ~/.cargo/registry + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-registry- + + - name: Cache Cargo build + uses: actions/cache@v4 + with: + path: target + key: ${{ runner.os }}-cargo-build-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-build- + + - name: Install stable toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + + - name: Run cargo check + uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: check + + test: + name: Test Suite + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + rust: [stable, beta, nightly] + runs-on: ${{ matrix.os }} + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Cache Cargo registry + uses: actions/cache@v4 + with: + path: ~/.cargo/registry + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-registry- + + - name: Cache Cargo build + uses: actions/cache@v4 + with: + path: target + key: ${{ runner.os }}-cargo-build-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-build- + + - name: Install ${{ matrix.rust }} toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: ${{ matrix.rust }} + + - name: Run cargo test + uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: test + args: --all-features --verbose + + lints: + name: Lints + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Cache Cargo registry + uses: actions/cache@v4 + with: + path: ~/.cargo/registry + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-registry- + + - name: Cache Cargo build + uses: actions/cache@v4 + with: + path: target + key: ${{ runner.os }}-cargo-build-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-build- + + - name: Install stable toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + components: rustfmt, clippy + + - name: Run cargo fmt + uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: fmt + args: --all -- --check + + - name: Run cargo clippy + uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: clippy + args: --all-targets -- -D warnings From 7f462406b11c5f07fdd04e43f6e283ec6d4fb25a Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Tue, 6 Aug 2024 12:42:39 +0300 Subject: [PATCH 10/21] fix: resolve linter problems --- src/error.rs | 1 + src/party.rs | 27 ++++++++++++++------------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/error.rs b/src/error.rs index cb73d7b..0475018 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,5 @@ //! Definition of the BPCon errors. +#[allow(dead_code)] pub enum BallotError { // TODO: define errors. } diff --git a/src/party.rs b/src/party.rs index c5b5afe..510d971 100644 --- a/src/party.rs +++ b/src/party.rs @@ -145,15 +145,15 @@ impl> Party { } pub fn ballot(&self) -> u64 { - return self.ballot; + self.ballot } pub fn is_launched(&self) -> bool { - return !self.is_stopped(); + !self.is_stopped() } pub fn is_stopped(&self) -> bool { - return self.status == PartyStatus::Finished || self.status == PartyStatus::Failed; + self.status == PartyStatus::Finished || self.status == PartyStatus::Failed } pub fn get_value_selected(&self) -> Option { @@ -162,7 +162,7 @@ impl> Party { return self.value_2a.clone(); } - return None; + None } fn get_leader(&self) -> u64 { @@ -171,7 +171,7 @@ impl> Party { } fn get_value(&self) -> V { - return self.value_selector.select(&self.parties_voted_before); + self.value_selector.select(&self.parties_voted_before) } /// Start the next ballot. It's expected from the external system to re-run ballot protocol in @@ -199,13 +199,13 @@ impl> Party { self.event_sender.send(PartyEvent::Finalize).unwrap(); } - return self.get_value_selected(); + self.get_value_selected() } /// Prepare state before running a ballot fn prepare_next_ballot(&mut self) { self.status = PartyStatus::None; - self.ballot = self.ballot + 1; + self.ballot += 1; // Clean state self.parties_voted_before = HashMap::new(); @@ -217,8 +217,8 @@ impl> Party { self.messages_2b_weight = 0; // Cleaning channels - while let Ok(_) = self.event_receiver.try_recv() {} - while let Ok(_) = self.msg_in_receiver.try_recv() {} + while self.event_receiver.try_recv().is_ok() {} + while self.msg_in_receiver.try_recv().is_ok() {} self.status = PartyStatus::Launched; } @@ -252,9 +252,10 @@ impl> Party { } } - if !self.parties_voted_before.contains_key(&routing.sender) { - self.parties_voted_before - .insert(routing.sender, msg.last_value_voted); + if let std::collections::hash_map::Entry::Vacant(e) = + self.parties_voted_before.entry(routing.sender) + { + e.insert(msg.last_value_voted); self.messages_1b_weight += self.cfg.party_weights[routing.sender as usize] as u128; @@ -365,7 +366,7 @@ impl> Party { .send(MessageWire { content_bytes: serde_json::to_vec(&Message1bContent { ballot: self.ballot, - last_ballot_voted: self.last_ballot_voted.clone(), + last_ballot_voted: self.last_ballot_voted, last_value_voted: self.last_value_voted.clone(), }) .unwrap(), From 6552d91a877c3576848f5ad4e8900418e9555a50 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Wed, 7 Aug 2024 16:43:39 +0300 Subject: [PATCH 11/21] chore: add docs ci --- .github/workflows/docs.yml | 52 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 .github/workflows/docs.yml diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml new file mode 100644 index 0000000..b7b02e4 --- /dev/null +++ b/.github/workflows/docs.yml @@ -0,0 +1,52 @@ +name: Deploy Rust Docs to GitHub Pages + +on: + push: + branches: + - main + +concurrency: + group: "pages" + cancel-in-progress: false + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Install stable toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + + - name: Build Rust Documentation + run: | + cargo doc --no-deps --document-private-items + echo "" > target/doc/index.html + + - name: Upload artifact + uses: actions/upload-pages-artifact@v3 + with: + path: ./target/doc + + deploy: + runs-on: ubuntu-latest + needs: build + + permissions: + pages: write + id-token: write + + environment: + name: docs + url: ${{ steps.deployment.outputs.page_url }} + + steps: + - name: Deploy to GitHub Pages + id: deployment + uses: actions/deploy-pages@v4 From 894d1a27c799c2158f37da8d6f1c28136444e1ce Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Fri, 9 Aug 2024 19:23:48 +0300 Subject: [PATCH 12/21] feat: implement leader election and error handling --- Cargo.toml | 1 + src/error.rs | 7 +- src/party.rs | 342 +++++++++++++++++++++++++++------------------------ 3 files changed, 185 insertions(+), 165 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e6fc552..b3f34fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,3 +8,4 @@ edition = "2021" [dependencies] serde = { version = "1.0.204", features = ["derive"] } serde_json = "1.0.122" +rand = "0.9.0-alpha.2" diff --git a/src/error.rs b/src/error.rs index 0475018..4bf5948 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,8 @@ //! Definition of the BPCon errors. -#[allow(dead_code)] +#[derive(Debug)] pub enum BallotError { - // TODO: define errors. + MessageParsing(String), + InvalidState(String), + Communication(String), + LeaderElection, } diff --git a/src/party.rs b/src/party.rs index 510d971..4fcf245 100644 --- a/src/party.rs +++ b/src/party.rs @@ -5,11 +5,13 @@ use crate::message::{ MessageRouting, MessageWire, ProtocolMessage, }; use crate::{Value, ValueSelector}; +use crate::error::BallotError; use std::cmp::PartialEq; use std::collections::{HashMap, HashSet}; use std::sync::mpsc::{channel, Receiver, Sender}; +use rand::Rng; -/// BPCon configuration. Includes ballot time bounds, and other stuff. +/// BPCon configuration. Includes ballot time bounds and other stuff. pub struct BPConConfig { /// Parties weights: `party_weights[i]` corresponds to the i-th party weight pub party_weights: Vec, @@ -165,9 +167,23 @@ impl> Party { None } - fn get_leader(&self) -> u64 { - // TODO: implement weight random based on some conditions - todo!() + fn get_leader(&self) -> Result { + let total_weight: u64 = self.cfg.party_weights.iter().sum(); + if total_weight == 0 { + return Err(BallotError::LeaderElection); + } + + let mut rng = rand::thread_rng(); + let random_value: u64 = rng.gen_range(0..total_weight); + + let mut cumulative_weight = 0; + for (i, &weight) in self.cfg.party_weights.iter().enumerate() { + cumulative_weight += weight; + if random_value < cumulative_weight { + return Ok(i as u64); + } + } + Err(BallotError::LeaderElection) } fn get_value(&self) -> V { @@ -176,30 +192,52 @@ impl> Party { /// Start the next ballot. It's expected from the external system to re-run ballot protocol in /// case of failed ballot. - pub async fn launch_ballot(&mut self) -> Option { + pub async fn launch_ballot(&mut self) -> Result, BallotError> { self.prepare_next_ballot(); while self.is_launched() { - // Check for new messages if let Ok(msg_wire) = self.msg_in_receiver.try_recv() { - self.update_state(msg_wire.content_bytes, msg_wire.routing); + if let Err(err) = self.update_state(msg_wire.content_bytes, msg_wire.routing) { + self.status = PartyStatus::Failed; + return Err(err); + } } - // Check for new events if let Ok(event) = self.event_receiver.try_recv() { - self.follow_event(event); + if let Err(err) = self.follow_event(event) { + self.status = PartyStatus::Failed; + return Err(err); + } } - // TODO: emit events to run ballot protocol according to the ballot configuration `BallotConfig` - self.event_sender.send(PartyEvent::Launch1a).unwrap(); - self.event_sender.send(PartyEvent::Launch1b).unwrap(); - self.event_sender.send(PartyEvent::Launch2a).unwrap(); - self.event_sender.send(PartyEvent::Launch2av).unwrap(); - self.event_sender.send(PartyEvent::Launch2b).unwrap(); - self.event_sender.send(PartyEvent::Finalize).unwrap(); + // TODO: Emit events to run ballot protocol according to the ballot configuration + if self.event_sender.send(PartyEvent::Launch1a).is_err() { + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("Failed to send Launch1a event".into())); + } + if self.event_sender.send(PartyEvent::Launch1b).is_err() { + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("Failed to send Launch1b event".into())); + } + if self.event_sender.send(PartyEvent::Launch2a).is_err() { + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("Failed to send Launch2a event".into())); + } + if self.event_sender.send(PartyEvent::Launch2av).is_err() { + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("Failed to send Launch2av event".into())); + } + if self.event_sender.send(PartyEvent::Launch2b).is_err() { + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("Failed to send Launch2b event".into())); + } + if self.event_sender.send(PartyEvent::Finalize).is_err() { + self.status = PartyStatus::Failed; + return Err(BallotError::Communication("Failed to send Finalize event".into())); + } } - self.get_value_selected() + Ok(self.get_value_selected()) } /// Prepare state before running a ballot @@ -224,217 +262,195 @@ impl> Party { } /// Update party's state based on message type. - fn update_state(&mut self, m: Vec, routing: MessageRouting) { - // TODO: Implement according to protocol rules. + fn update_state(&mut self, m: Vec, routing: MessageRouting) -> Result<(), BallotError> { match routing.msg_type { ProtocolMessage::Msg1a => { - if let Ok(msg) = serde_json::from_slice::(m.as_slice()) { - if msg.ballot != self.ballot { - return; - } + let msg: Message1aContent = serde_json::from_slice(m.as_slice()) + .map_err(|_| BallotError::MessageParsing("Failed to parse Msg1a".into()))?; - if routing.sender != self.get_leader() { - return; - } + if msg.ballot != self.ballot { + return Err(BallotError::InvalidState("Ballot number mismatch in Msg1a".into())); + } - self.status = PartyStatus::Passed1a; + if routing.sender != self.get_leader()? { + return Err(BallotError::InvalidState("Invalid leader in Msg1a".into())); } + + self.status = PartyStatus::Passed1a; } ProtocolMessage::Msg1b => { - if let Ok(msg) = serde_json::from_slice::(m.as_slice()) { - if msg.ballot != self.ballot { - return; - } + let msg: Message1bContent = serde_json::from_slice(m.as_slice()) + .map_err(|_| BallotError::MessageParsing("Failed to parse Msg1b".into()))?; - if let Some(last_ballot_voted) = msg.last_ballot_voted { - if last_ballot_voted >= self.ballot { - return; - } + if msg.ballot != self.ballot { + return Err(BallotError::InvalidState("Ballot number mismatch in Msg1b".into())); + } + + if let Some(last_ballot_voted) = msg.last_ballot_voted { + if last_ballot_voted >= self.ballot { + return Err(BallotError::InvalidState("Received outdated 1b message".into())); } + } - if let std::collections::hash_map::Entry::Vacant(e) = - self.parties_voted_before.entry(routing.sender) - { - e.insert(msg.last_value_voted); - self.messages_1b_weight += - self.cfg.party_weights[routing.sender as usize] as u128; + if let std::collections::hash_map::Entry::Vacant(e) = + self.parties_voted_before.entry(routing.sender) + { + e.insert(msg.last_value_voted); + self.messages_1b_weight += + self.cfg.party_weights[routing.sender as usize] as u128; - if self.messages_1b_weight > self.cfg.threshold { - self.status = PartyStatus::Passed1b - } + if self.messages_1b_weight > self.cfg.threshold { + self.status = PartyStatus::Passed1b; } } } ProtocolMessage::Msg2a => { - if let Ok(msg) = serde_json::from_slice::(m.as_slice()) { - if msg.ballot != self.ballot { - return; - } + let msg: Message2aContent = serde_json::from_slice(m.as_slice()) + .map_err(|_| BallotError::MessageParsing("Failed to parse Msg2a".into()))?; - if routing.sender != self.get_leader() { - return; - } + if msg.ballot != self.ballot { + return Err(BallotError::InvalidState("Ballot number mismatch in Msg2a".into())); + } - if let Ok(value_received) = serde_json::from_slice::(msg.value.as_slice()) { - if self - .value_selector - .verify(&value_received, &self.parties_voted_before) - { - self.status = PartyStatus::Passed2a; - self.value_2a = Some(value_received); - } - } + if routing.sender != self.get_leader()? { + return Err(BallotError::InvalidState("Invalid leader in Msg2a".into())); + } + + let value_received: V = serde_json::from_slice(msg.value.as_slice()) + .map_err(|_| BallotError::MessageParsing("Failed to parse value in Msg2a".into()))?; + + if self.value_selector.verify(&value_received, &self.parties_voted_before) { + self.status = PartyStatus::Passed2a; + self.value_2a = Some(value_received); + } else { + return Err(BallotError::InvalidState("Failed to verify value in Msg2a".into())); } } ProtocolMessage::Msg2av => { - if let Ok(msg) = serde_json::from_slice::(m.as_slice()) { - if msg.ballot != self.ballot { - return; - } + let msg: Message2avContent = serde_json::from_slice(m.as_slice()) + .map_err(|_| BallotError::MessageParsing("Failed to parse Msg2av".into()))?; - if let Ok(value_received) = - serde_json::from_slice::(msg.received_value.as_slice()) - { - if value_received != self.value_2a.clone().unwrap() { - return; - } - } + if msg.ballot != self.ballot { + return Err(BallotError::InvalidState("Ballot number mismatch in Msg2av".into())); + } + + let value_received: V = serde_json::from_slice(msg.received_value.as_slice()) + .map_err(|_| BallotError::MessageParsing("Failed to parse value in Msg2av".into()))?; + + if value_received != self.value_2a.clone().unwrap() { + return Err(BallotError::InvalidState("Received different value in Msg2av".into())); + } - if !self.messages_2av_senders.contains(&routing.sender) { - self.messages_2av_senders.insert(routing.sender); - self.messages_2av_weight += - self.cfg.party_weights[routing.sender as usize] as u128; + if !self.messages_2av_senders.contains(&routing.sender) { + self.messages_2av_senders.insert(routing.sender); + self.messages_2av_weight += + self.cfg.party_weights[routing.sender as usize] as u128; - if self.messages_2av_weight > self.cfg.threshold { - self.status = PartyStatus::Passed2av - } + if self.messages_2av_weight > self.cfg.threshold { + self.status = PartyStatus::Passed2av; } } } ProtocolMessage::Msg2b => { - if let Ok(msg) = serde_json::from_slice::(m.as_slice()) { - if msg.ballot != self.ballot { - return; - } + let msg: Message2bContent = serde_json::from_slice(m.as_slice()) + .map_err(|_| BallotError::MessageParsing("Failed to parse Msg2b".into()))?; + + if msg.ballot != self.ballot { + return Err(BallotError::InvalidState("Ballot number mismatch in Msg2b".into())); + } - // Only those who submitted 2av - if self.messages_2av_senders.contains(&routing.sender) - && !self.messages_2b_senders.contains(&routing.sender) - { - self.messages_2b_senders.insert(routing.sender); - self.messages_2b_weight += - self.cfg.party_weights[routing.sender as usize] as u128; - - if self.messages_2b_weight > self.cfg.threshold { - self.status = PartyStatus::Passed2b - } + if self.messages_2av_senders.contains(&routing.sender) + && !self.messages_2b_senders.contains(&routing.sender) + { + self.messages_2b_senders.insert(routing.sender); + self.messages_2b_weight += + self.cfg.party_weights[routing.sender as usize] as u128; + + if self.messages_2b_weight > self.cfg.threshold { + self.status = PartyStatus::Passed2b; } } } } + Ok(()) } /// Executes ballot actions according to the received event. - fn follow_event(&mut self, event: PartyEvent) { - // TODO: Implement according to protocol rules. + fn follow_event(&mut self, event: PartyEvent) -> Result<(), BallotError> { match event { PartyEvent::Launch1a => { if self.status != PartyStatus::Launched { - self.status = PartyStatus::Failed; - return; + return Err(BallotError::InvalidState("Cannot launch 1a, incorrect state".into())); } - - if self.get_leader() == self.id { - self.msg_out_sender - .send(MessageWire { - content_bytes: serde_json::to_vec(&Message1aContent { - ballot: self.ballot, - }) - .unwrap(), - routing: Message1aContent::get_routing(self.id), - }) - .unwrap(); + if self.get_leader()? == self.id { + self.msg_out_sender.send(MessageWire { + content_bytes: serde_json::to_vec(&Message1aContent { ballot: self.ballot }) + .map_err(|_| BallotError::MessageParsing("Failed to serialize Msg1a".into()))?, + routing: Message1aContent::get_routing(self.id), + }).map_err(|_| BallotError::Communication("Failed to send Msg1a".into()))?; } } PartyEvent::Launch1b => { if self.status != PartyStatus::Passed1a { - self.status = PartyStatus::Failed; - return; + return Err(BallotError::InvalidState("Cannot launch 1b, incorrect state".into())); } - - self.msg_out_sender - .send(MessageWire { - content_bytes: serde_json::to_vec(&Message1bContent { - ballot: self.ballot, - last_ballot_voted: self.last_ballot_voted, - last_value_voted: self.last_value_voted.clone(), - }) - .unwrap(), - routing: Message1bContent::get_routing(self.id), + self.msg_out_sender.send(MessageWire { + content_bytes: serde_json::to_vec(&Message1bContent { + ballot: self.ballot, + last_ballot_voted: self.last_ballot_voted, + last_value_voted: self.last_value_voted.clone(), }) - .unwrap(); + .map_err(|_| BallotError::MessageParsing("Failed to serialize Msg1b".into()))?, + routing: Message1bContent::get_routing(self.id), + }).map_err(|_| BallotError::Communication("Failed to send Msg1b".into()))?; } PartyEvent::Launch2a => { if self.status != PartyStatus::Passed1b { - self.status = PartyStatus::Failed; - return; + return Err(BallotError::InvalidState("Cannot launch 2a, incorrect state".into())); } - - if self.get_leader() == self.id { - self.msg_out_sender - .send(MessageWire { - content_bytes: serde_json::to_vec(&Message2aContent { - ballot: self.ballot, - value: serde_json::to_vec(&self.get_value()).unwrap(), - }) - .unwrap(), - routing: Message2aContent::get_routing(self.id), + if self.get_leader()? == self.id { + self.msg_out_sender.send(MessageWire { + content_bytes: serde_json::to_vec(&Message2aContent { + ballot: self.ballot, + value: serde_json::to_vec(&self.get_value()) + .map_err(|_| BallotError::MessageParsing("Failed to serialize value for Msg2a".into()))?, }) - .unwrap(); + .map_err(|_| BallotError::MessageParsing("Failed to serialize Msg2a".into()))?, + routing: Message2aContent::get_routing(self.id), + }).map_err(|_| BallotError::Communication("Failed to send Msg2a".into()))?; } } PartyEvent::Launch2av => { if self.status != PartyStatus::Passed2a { - self.status = PartyStatus::Failed; - return; + return Err(BallotError::InvalidState("Cannot launch 2av, incorrect state".into())); } - - self.msg_out_sender - .send(MessageWire { - content_bytes: serde_json::to_vec(&Message2avContent { - ballot: self.ballot, - received_value: serde_json::to_vec(&self.value_2a.clone().unwrap()) - .unwrap(), - }) - .unwrap(), - routing: Message2avContent::get_routing(self.id), + self.msg_out_sender.send(MessageWire { + content_bytes: serde_json::to_vec(&Message2avContent { + ballot: self.ballot, + received_value: serde_json::to_vec(&self.value_2a.clone().unwrap()) + .map_err(|_| BallotError::MessageParsing("Failed to serialize value for Msg2av".into()))?, }) - .unwrap(); + .map_err(|_| BallotError::MessageParsing("Failed to serialize Msg2av".into()))?, + routing: Message2avContent::get_routing(self.id), + }).map_err(|_| BallotError::Communication("Failed to send Msg2av".into()))?; } PartyEvent::Launch2b => { if self.status != PartyStatus::Passed2av { - self.status = PartyStatus::Failed; - return; + return Err(BallotError::InvalidState("Cannot launch 2b, incorrect state".into())); } - - self.msg_out_sender - .send(MessageWire { - content_bytes: serde_json::to_vec(&Message2bContent { - ballot: self.ballot, - }) - .unwrap(), - routing: Message2bContent::get_routing(self.id), - }) - .unwrap(); + self.msg_out_sender.send(MessageWire { + content_bytes: serde_json::to_vec(&Message2bContent { ballot: self.ballot }) + .map_err(|_| BallotError::MessageParsing("Failed to serialize Msg2b".into()))?, + routing: Message2bContent::get_routing(self.id), + }).map_err(|_| BallotError::Communication("Failed to send Msg2b".into()))?; } PartyEvent::Finalize => { - if self.status != PartyStatus::Passed2av { - self.status = PartyStatus::Failed; - return; + if self.status != PartyStatus::Passed2b { + return Err(BallotError::InvalidState("Cannot finalize, incorrect state".into())); } - self.status = PartyStatus::Finished; } } + Ok(()) } } From c3a2523f11c84e0767b328637dfd16f0b51b06bc Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Sun, 11 Aug 2024 00:00:26 +0300 Subject: [PATCH 13/21] feat: added tests --- src/error.rs | 25 ++++++++++++++++ src/party.rs | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index 4bf5948..a82211c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,3 +6,28 @@ pub enum BallotError { Communication(String), LeaderElection, } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ballot_error_message_parsing() { + let error = BallotError::MessageParsing("Parsing failed".into()); + if let BallotError::MessageParsing(msg) = error { + assert_eq!(msg, "Parsing failed"); + } else { + panic!("Expected MessageParsing error"); + } + } + + #[test] + fn test_ballot_error_invalid_state() { + let error = BallotError::InvalidState("Invalid state transition".into()); + if let BallotError::InvalidState(msg) = error { + assert_eq!(msg, "Invalid state transition"); + } else { + panic!("Expected InvalidState error"); + } + } +} diff --git a/src/party.rs b/src/party.rs index 4fcf245..cea4d01 100644 --- a/src/party.rs +++ b/src/party.rs @@ -23,7 +23,7 @@ pub struct BPConConfig { /// Party status defines the statuses of the ballot for the particular participant /// depending on local calculations. -#[derive(PartialEq)] +#[derive(PartialEq,Debug)] pub(crate) enum PartyStatus { None, Launched, @@ -454,3 +454,85 @@ impl> Party { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + + use serde::{Deserialize, Serialize}; + use std::collections::HashMap; + + // Mock implementation of Value + #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] + struct MockValue(u64); // Simple mock type wrapping an integer + + impl Value for MockValue {} + + // Mock implementation of ValueSelector + struct MockValueSelector; + + impl ValueSelector for MockValueSelector { + fn verify(&self, _v: &MockValue, _m: &HashMap>>) -> bool { + true // For testing, always return true + } + + fn select(&self, _m: &HashMap>>) -> MockValue { + MockValue(42) // For testing, always return the same value + } + } + + #[test] + fn test_get_leader_simple_case() { + let cfg = BPConConfig { + party_weights: vec![1, 1, 1], // Simple case with equal weights + threshold: 2, + }; + + let party = Party::::new(0, cfg, MockValueSelector).0; + + // Call get_leader multiple times to check distribution + let leader = party.get_leader().unwrap(); + assert!(leader <= 2, "Leader index out of bounds"); + } + + #[test] + fn test_get_leader_weighted() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], // Weighted case + threshold: 4, + }; + + let party = Party::::new(0, cfg, MockValueSelector).0; + + // Call get_leader multiple times to check distribution + let leader = party.get_leader().unwrap(); + assert!(leader <= 2, "Leader index out of bounds"); + } + + #[test] + fn test_update_state_msg1a() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + }; + let mut party = Party::::new(0, cfg, MockValueSelector).0; + party.status = PartyStatus::Launched; + party.ballot = 1; + + let msg = Message1aContent { ballot: 1 }; + let routing = MessageRouting { + sender: 0, + receivers: vec![1], // Fix: use `receivers` instead of `receiver` + is_broadcast: false, + msg_type: ProtocolMessage::Msg1a, + }; + + let msg_wire = MessageWire { + content_bytes: serde_json::to_vec(&msg).unwrap(), + routing, + }; + + party.update_state(msg_wire.content_bytes, msg_wire.routing).unwrap(); + assert_eq!(party.status, PartyStatus::Passed1a); + } +} From 11ac9669f8157202963653297db92ccc96dde1fc Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Sun, 11 Aug 2024 00:17:26 +0300 Subject: [PATCH 14/21] chore: cargo fmt --- src/party.rs | 204 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 142 insertions(+), 62 deletions(-) diff --git a/src/party.rs b/src/party.rs index cea4d01..41e0816 100644 --- a/src/party.rs +++ b/src/party.rs @@ -1,15 +1,15 @@ //! Definition of the BPCon participant structure. +use crate::error::BallotError; use crate::message::{ Message1aContent, Message1bContent, Message2aContent, Message2avContent, Message2bContent, MessageRouting, MessageWire, ProtocolMessage, }; use crate::{Value, ValueSelector}; -use crate::error::BallotError; +use rand::Rng; use std::cmp::PartialEq; use std::collections::{HashMap, HashSet}; use std::sync::mpsc::{channel, Receiver, Sender}; -use rand::Rng; /// BPCon configuration. Includes ballot time bounds and other stuff. pub struct BPConConfig { @@ -23,7 +23,7 @@ pub struct BPConConfig { /// Party status defines the statuses of the ballot for the particular participant /// depending on local calculations. -#[derive(PartialEq,Debug)] +#[derive(PartialEq, Debug)] pub(crate) enum PartyStatus { None, Launched, @@ -213,27 +213,39 @@ impl> Party { // TODO: Emit events to run ballot protocol according to the ballot configuration if self.event_sender.send(PartyEvent::Launch1a).is_err() { self.status = PartyStatus::Failed; - return Err(BallotError::Communication("Failed to send Launch1a event".into())); + return Err(BallotError::Communication( + "Failed to send Launch1a event".into(), + )); } if self.event_sender.send(PartyEvent::Launch1b).is_err() { self.status = PartyStatus::Failed; - return Err(BallotError::Communication("Failed to send Launch1b event".into())); + return Err(BallotError::Communication( + "Failed to send Launch1b event".into(), + )); } if self.event_sender.send(PartyEvent::Launch2a).is_err() { self.status = PartyStatus::Failed; - return Err(BallotError::Communication("Failed to send Launch2a event".into())); + return Err(BallotError::Communication( + "Failed to send Launch2a event".into(), + )); } if self.event_sender.send(PartyEvent::Launch2av).is_err() { self.status = PartyStatus::Failed; - return Err(BallotError::Communication("Failed to send Launch2av event".into())); + return Err(BallotError::Communication( + "Failed to send Launch2av event".into(), + )); } if self.event_sender.send(PartyEvent::Launch2b).is_err() { self.status = PartyStatus::Failed; - return Err(BallotError::Communication("Failed to send Launch2b event".into())); + return Err(BallotError::Communication( + "Failed to send Launch2b event".into(), + )); } if self.event_sender.send(PartyEvent::Finalize).is_err() { self.status = PartyStatus::Failed; - return Err(BallotError::Communication("Failed to send Finalize event".into())); + return Err(BallotError::Communication( + "Failed to send Finalize event".into(), + )); } } @@ -269,7 +281,9 @@ impl> Party { .map_err(|_| BallotError::MessageParsing("Failed to parse Msg1a".into()))?; if msg.ballot != self.ballot { - return Err(BallotError::InvalidState("Ballot number mismatch in Msg1a".into())); + return Err(BallotError::InvalidState( + "Ballot number mismatch in Msg1a".into(), + )); } if routing.sender != self.get_leader()? { @@ -283,12 +297,16 @@ impl> Party { .map_err(|_| BallotError::MessageParsing("Failed to parse Msg1b".into()))?; if msg.ballot != self.ballot { - return Err(BallotError::InvalidState("Ballot number mismatch in Msg1b".into())); + return Err(BallotError::InvalidState( + "Ballot number mismatch in Msg1b".into(), + )); } if let Some(last_ballot_voted) = msg.last_ballot_voted { if last_ballot_voted >= self.ballot { - return Err(BallotError::InvalidState("Received outdated 1b message".into())); + return Err(BallotError::InvalidState( + "Received outdated 1b message".into(), + )); } } @@ -309,21 +327,30 @@ impl> Party { .map_err(|_| BallotError::MessageParsing("Failed to parse Msg2a".into()))?; if msg.ballot != self.ballot { - return Err(BallotError::InvalidState("Ballot number mismatch in Msg2a".into())); + return Err(BallotError::InvalidState( + "Ballot number mismatch in Msg2a".into(), + )); } if routing.sender != self.get_leader()? { return Err(BallotError::InvalidState("Invalid leader in Msg2a".into())); } - let value_received: V = serde_json::from_slice(msg.value.as_slice()) - .map_err(|_| BallotError::MessageParsing("Failed to parse value in Msg2a".into()))?; + let value_received: V = + serde_json::from_slice(msg.value.as_slice()).map_err(|_| { + BallotError::MessageParsing("Failed to parse value in Msg2a".into()) + })?; - if self.value_selector.verify(&value_received, &self.parties_voted_before) { + if self + .value_selector + .verify(&value_received, &self.parties_voted_before) + { self.status = PartyStatus::Passed2a; self.value_2a = Some(value_received); } else { - return Err(BallotError::InvalidState("Failed to verify value in Msg2a".into())); + return Err(BallotError::InvalidState( + "Failed to verify value in Msg2a".into(), + )); } } ProtocolMessage::Msg2av => { @@ -331,14 +358,20 @@ impl> Party { .map_err(|_| BallotError::MessageParsing("Failed to parse Msg2av".into()))?; if msg.ballot != self.ballot { - return Err(BallotError::InvalidState("Ballot number mismatch in Msg2av".into())); + return Err(BallotError::InvalidState( + "Ballot number mismatch in Msg2av".into(), + )); } let value_received: V = serde_json::from_slice(msg.received_value.as_slice()) - .map_err(|_| BallotError::MessageParsing("Failed to parse value in Msg2av".into()))?; + .map_err(|_| { + BallotError::MessageParsing("Failed to parse value in Msg2av".into()) + })?; if value_received != self.value_2a.clone().unwrap() { - return Err(BallotError::InvalidState("Received different value in Msg2av".into())); + return Err(BallotError::InvalidState( + "Received different value in Msg2av".into(), + )); } if !self.messages_2av_senders.contains(&routing.sender) { @@ -356,7 +389,9 @@ impl> Party { .map_err(|_| BallotError::MessageParsing("Failed to parse Msg2b".into()))?; if msg.ballot != self.ballot { - return Err(BallotError::InvalidState("Ballot number mismatch in Msg2b".into())); + return Err(BallotError::InvalidState( + "Ballot number mismatch in Msg2b".into(), + )); } if self.messages_2av_senders.contains(&routing.sender) @@ -380,73 +415,116 @@ impl> Party { match event { PartyEvent::Launch1a => { if self.status != PartyStatus::Launched { - return Err(BallotError::InvalidState("Cannot launch 1a, incorrect state".into())); + return Err(BallotError::InvalidState( + "Cannot launch 1a, incorrect state".into(), + )); } if self.get_leader()? == self.id { - self.msg_out_sender.send(MessageWire { - content_bytes: serde_json::to_vec(&Message1aContent { ballot: self.ballot }) - .map_err(|_| BallotError::MessageParsing("Failed to serialize Msg1a".into()))?, - routing: Message1aContent::get_routing(self.id), - }).map_err(|_| BallotError::Communication("Failed to send Msg1a".into()))?; + self.msg_out_sender + .send(MessageWire { + content_bytes: serde_json::to_vec(&Message1aContent { + ballot: self.ballot, + }) + .map_err(|_| { + BallotError::MessageParsing("Failed to serialize Msg1a".into()) + })?, + routing: Message1aContent::get_routing(self.id), + }) + .map_err(|_| BallotError::Communication("Failed to send Msg1a".into()))?; } } PartyEvent::Launch1b => { if self.status != PartyStatus::Passed1a { - return Err(BallotError::InvalidState("Cannot launch 1b, incorrect state".into())); + return Err(BallotError::InvalidState( + "Cannot launch 1b, incorrect state".into(), + )); } - self.msg_out_sender.send(MessageWire { - content_bytes: serde_json::to_vec(&Message1bContent { - ballot: self.ballot, - last_ballot_voted: self.last_ballot_voted, - last_value_voted: self.last_value_voted.clone(), + self.msg_out_sender + .send(MessageWire { + content_bytes: serde_json::to_vec(&Message1bContent { + ballot: self.ballot, + last_ballot_voted: self.last_ballot_voted, + last_value_voted: self.last_value_voted.clone(), + }) + .map_err(|_| { + BallotError::MessageParsing("Failed to serialize Msg1b".into()) + })?, + routing: Message1bContent::get_routing(self.id), }) - .map_err(|_| BallotError::MessageParsing("Failed to serialize Msg1b".into()))?, - routing: Message1bContent::get_routing(self.id), - }).map_err(|_| BallotError::Communication("Failed to send Msg1b".into()))?; + .map_err(|_| BallotError::Communication("Failed to send Msg1b".into()))?; } PartyEvent::Launch2a => { if self.status != PartyStatus::Passed1b { - return Err(BallotError::InvalidState("Cannot launch 2a, incorrect state".into())); + return Err(BallotError::InvalidState( + "Cannot launch 2a, incorrect state".into(), + )); } if self.get_leader()? == self.id { - self.msg_out_sender.send(MessageWire { - content_bytes: serde_json::to_vec(&Message2aContent { - ballot: self.ballot, - value: serde_json::to_vec(&self.get_value()) - .map_err(|_| BallotError::MessageParsing("Failed to serialize value for Msg2a".into()))?, + self.msg_out_sender + .send(MessageWire { + content_bytes: serde_json::to_vec(&Message2aContent { + ballot: self.ballot, + value: serde_json::to_vec(&self.get_value()).map_err(|_| { + BallotError::MessageParsing( + "Failed to serialize value for Msg2a".into(), + ) + })?, + }) + .map_err(|_| { + BallotError::MessageParsing("Failed to serialize Msg2a".into()) + })?, + routing: Message2aContent::get_routing(self.id), }) - .map_err(|_| BallotError::MessageParsing("Failed to serialize Msg2a".into()))?, - routing: Message2aContent::get_routing(self.id), - }).map_err(|_| BallotError::Communication("Failed to send Msg2a".into()))?; + .map_err(|_| BallotError::Communication("Failed to send Msg2a".into()))?; } } PartyEvent::Launch2av => { if self.status != PartyStatus::Passed2a { - return Err(BallotError::InvalidState("Cannot launch 2av, incorrect state".into())); + return Err(BallotError::InvalidState( + "Cannot launch 2av, incorrect state".into(), + )); } - self.msg_out_sender.send(MessageWire { - content_bytes: serde_json::to_vec(&Message2avContent { - ballot: self.ballot, - received_value: serde_json::to_vec(&self.value_2a.clone().unwrap()) - .map_err(|_| BallotError::MessageParsing("Failed to serialize value for Msg2av".into()))?, + self.msg_out_sender + .send(MessageWire { + content_bytes: serde_json::to_vec(&Message2avContent { + ballot: self.ballot, + received_value: serde_json::to_vec(&self.value_2a.clone().unwrap()) + .map_err(|_| { + BallotError::MessageParsing( + "Failed to serialize value for Msg2av".into(), + ) + })?, + }) + .map_err(|_| { + BallotError::MessageParsing("Failed to serialize Msg2av".into()) + })?, + routing: Message2avContent::get_routing(self.id), }) - .map_err(|_| BallotError::MessageParsing("Failed to serialize Msg2av".into()))?, - routing: Message2avContent::get_routing(self.id), - }).map_err(|_| BallotError::Communication("Failed to send Msg2av".into()))?; + .map_err(|_| BallotError::Communication("Failed to send Msg2av".into()))?; } PartyEvent::Launch2b => { if self.status != PartyStatus::Passed2av { - return Err(BallotError::InvalidState("Cannot launch 2b, incorrect state".into())); + return Err(BallotError::InvalidState( + "Cannot launch 2b, incorrect state".into(), + )); } - self.msg_out_sender.send(MessageWire { - content_bytes: serde_json::to_vec(&Message2bContent { ballot: self.ballot }) - .map_err(|_| BallotError::MessageParsing("Failed to serialize Msg2b".into()))?, - routing: Message2bContent::get_routing(self.id), - }).map_err(|_| BallotError::Communication("Failed to send Msg2b".into()))?; + self.msg_out_sender + .send(MessageWire { + content_bytes: serde_json::to_vec(&Message2bContent { + ballot: self.ballot, + }) + .map_err(|_| { + BallotError::MessageParsing("Failed to serialize Msg2b".into()) + })?, + routing: Message2bContent::get_routing(self.id), + }) + .map_err(|_| BallotError::Communication("Failed to send Msg2b".into()))?; } PartyEvent::Finalize => { if self.status != PartyStatus::Passed2b { - return Err(BallotError::InvalidState("Cannot finalize, incorrect state".into())); + return Err(BallotError::InvalidState( + "Cannot finalize, incorrect state".into(), + )); } self.status = PartyStatus::Finished; } @@ -532,7 +610,9 @@ mod tests { routing, }; - party.update_state(msg_wire.content_bytes, msg_wire.routing).unwrap(); + party + .update_state(msg_wire.content_bytes, msg_wire.routing) + .unwrap(); assert_eq!(party.status, PartyStatus::Passed1a); } } From dc43a8211aa11f69c90c1961aa326c71f2737862 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Sun, 11 Aug 2024 00:49:04 +0300 Subject: [PATCH 15/21] fix: resolve leader election issues --- Cargo.toml | 1 + src/error.rs | 21 +++++++++- src/party.rs | 113 +++++++++++++++++++++++++++++++-------------------- 3 files changed, 90 insertions(+), 45 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b3f34fd..4b972ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,4 @@ edition = "2021" serde = { version = "1.0.204", features = ["derive"] } serde_json = "1.0.122" rand = "0.9.0-alpha.2" +log = "0.4.22" diff --git a/src/error.rs b/src/error.rs index a82211c..1d031e5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,10 +1,29 @@ //! Definition of the BPCon errors. + +use std::fmt; + #[derive(Debug)] pub enum BallotError { MessageParsing(String), InvalidState(String), Communication(String), - LeaderElection, +} + +impl fmt::Display for BallotError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + BallotError::MessageParsing(ref err) => write!(f, "Message parsing error: {}", err), + BallotError::InvalidState(ref err) => write!(f, "Invalid state error: {}", err), + BallotError::Communication(ref err) => write!(f, "Communication error: {}", err), + } + } +} + +impl std::error::Error for BallotError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + // Since these are all simple String errors, there is no underlying source error. + None + } } #[cfg(test)] diff --git a/src/party.rs b/src/party.rs index 41e0816..51d9f7f 100644 --- a/src/party.rs +++ b/src/party.rs @@ -18,6 +18,7 @@ pub struct BPConConfig { /// Threshold weight to define BFT quorum: should be > 2/3 of total weight pub threshold: u128, + pub leader: u64, // TODO: define other config fields. } @@ -110,6 +111,46 @@ pub struct Party> { messages_2b_weight: u128, } +#[derive(Debug)] +pub enum LeaderElectionError { + ZeroWeightSum, + ElectionFailed, +} + +impl std::fmt::Display for LeaderElectionError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match *self { + LeaderElectionError::ZeroWeightSum => { + write!(f, "party weights sum must be positive value") + } + LeaderElectionError::ElectionFailed => write!(f, "leader election failed"), + } + } +} + +impl std::error::Error for LeaderElectionError {} + +/// Compute leader in a weighed randomized manner. +/// Use this function for inter-ballot instantiation of config. +pub fn compute_leader(party_weights: Vec) -> Result { + let total_weight: u64 = party_weights.iter().sum(); + if total_weight == 0 { + return Err(LeaderElectionError::ZeroWeightSum); + } + + let mut rng = rand::thread_rng(); + let random_value: u64 = rng.gen_range(0..total_weight); + + let mut cumulative_weight = 0; + for (i, &weight) in party_weights.iter().enumerate() { + cumulative_weight += weight; + if random_value < cumulative_weight { + return Ok(i as u64); + } + } + Err(LeaderElectionError::ElectionFailed) +} + impl> Party { pub fn new( id: u64, @@ -167,25 +208,6 @@ impl> Party { None } - fn get_leader(&self) -> Result { - let total_weight: u64 = self.cfg.party_weights.iter().sum(); - if total_weight == 0 { - return Err(BallotError::LeaderElection); - } - - let mut rng = rand::thread_rng(); - let random_value: u64 = rng.gen_range(0..total_weight); - - let mut cumulative_weight = 0; - for (i, &weight) in self.cfg.party_weights.iter().enumerate() { - cumulative_weight += weight; - if random_value < cumulative_weight { - return Ok(i as u64); - } - } - Err(BallotError::LeaderElection) - } - fn get_value(&self) -> V { self.value_selector.select(&self.parties_voted_before) } @@ -286,7 +308,7 @@ impl> Party { )); } - if routing.sender != self.get_leader()? { + if routing.sender != self.cfg.leader { return Err(BallotError::InvalidState("Invalid leader in Msg1a".into())); } @@ -332,7 +354,7 @@ impl> Party { )); } - if routing.sender != self.get_leader()? { + if routing.sender != self.cfg.leader { return Err(BallotError::InvalidState("Invalid leader in Msg2a".into())); } @@ -419,7 +441,7 @@ impl> Party { "Cannot launch 1a, incorrect state".into(), )); } - if self.get_leader()? == self.id { + if self.cfg.leader == self.id { self.msg_out_sender .send(MessageWire { content_bytes: serde_json::to_vec(&Message1aContent { @@ -459,7 +481,7 @@ impl> Party { "Cannot launch 2a, incorrect state".into(), )); } - if self.get_leader()? == self.id { + if self.cfg.leader == self.id { self.msg_out_sender .send(MessageWire { content_bytes: serde_json::to_vec(&Message2aContent { @@ -560,31 +582,33 @@ mod tests { } #[test] - fn test_get_leader_simple_case() { - let cfg = BPConConfig { - party_weights: vec![1, 1, 1], // Simple case with equal weights - threshold: 2, - }; + fn test_compute_leader_weighted_case() { + let party_weights = vec![1, 2, 7]; // Weighted case - let party = Party::::new(0, cfg, MockValueSelector).0; + let mut leader_counts = vec![0; 3]; + let iterations = 10_000; - // Call get_leader multiple times to check distribution - let leader = party.get_leader().unwrap(); - assert!(leader <= 2, "Leader index out of bounds"); + for _ in 0..iterations { + let leader = compute_leader(party_weights.clone()).unwrap(); + leader_counts[leader as usize] += 1; + } + + // With 1:2:7 weights, the third party (index 2) should be selected the most frequently + println!("Leader selection counts: {:?}", leader_counts); + + assert!(leader_counts[2] > leader_counts[1]); + assert!(leader_counts[1] > leader_counts[0]); } #[test] - fn test_get_leader_weighted() { - let cfg = BPConConfig { - party_weights: vec![1, 2, 3], // Weighted case - threshold: 4, - }; + fn test_compute_leader_zero_weights() { + let party_weights = vec![0, 0, 0]; + let result = compute_leader(party_weights); - let party = Party::::new(0, cfg, MockValueSelector).0; - - // Call get_leader multiple times to check distribution - let leader = party.get_leader().unwrap(); - assert!(leader <= 2, "Leader index out of bounds"); + match result { + Err(LeaderElectionError::ZeroWeightSum) => {} // This is the expected outcome + _ => panic!("Expected ZeroWeightSum error"), + } } #[test] @@ -592,6 +616,7 @@ mod tests { let cfg = BPConConfig { party_weights: vec![1, 2, 3], threshold: 4, + leader: 1, }; let mut party = Party::::new(0, cfg, MockValueSelector).0; party.status = PartyStatus::Launched; @@ -599,8 +624,8 @@ mod tests { let msg = Message1aContent { ballot: 1 }; let routing = MessageRouting { - sender: 0, - receivers: vec![1], // Fix: use `receivers` instead of `receiver` + sender: 1, + receivers: vec![2, 3], is_broadcast: false, msg_type: ProtocolMessage::Msg1a, }; From b0c4aee6f3dc225663da798fb6af1bb35411bcfa Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Sun, 11 Aug 2024 01:09:46 +0300 Subject: [PATCH 16/21] feat: added full coverage on update state --- src/party.rs | 219 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 219 insertions(+) diff --git a/src/party.rs b/src/party.rs index 51d9f7f..6bc7d8e 100644 --- a/src/party.rs +++ b/src/party.rs @@ -640,4 +640,223 @@ mod tests { .unwrap(); assert_eq!(party.status, PartyStatus::Passed1a); } + + #[test] + fn test_update_state_msg1b() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], // Total weight is 6 + threshold: 4, // Threshold is 4 + leader: 1, + }; + let mut party = Party::::new(0, cfg, MockValueSelector).0; + party.status = PartyStatus::Passed1a; + party.ballot = 1; + + // First, send a 1b message from party 1 (weight 2) + let msg1 = Message1bContent { + ballot: 1, + last_ballot_voted: Some(0), + last_value_voted: Some(vec![1, 2, 3]), + }; + let routing1 = MessageRouting { + sender: 1, // Party 1 sends the message + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg1b, + }; + + let msg_wire1 = MessageWire { + content_bytes: serde_json::to_vec(&msg1).unwrap(), + routing: routing1, + }; + + party + .update_state(msg_wire1.content_bytes, msg_wire1.routing) + .unwrap(); + + // Now, send a 1b message from party 2 (weight 3) + let msg2 = Message1bContent { + ballot: 1, + last_ballot_voted: Some(0), + last_value_voted: Some(vec![1, 2, 3]), + }; + let routing2 = MessageRouting { + sender: 2, // Party 2 sends the message + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg1b, + }; + + let msg_wire2 = MessageWire { + content_bytes: serde_json::to_vec(&msg2).unwrap(), + routing: routing2, + }; + + party + .update_state(msg_wire2.content_bytes, msg_wire2.routing) + .unwrap(); + + // After both messages, the cumulative weight is 2 + 3 = 5, which exceeds the threshold + assert_eq!(party.status, PartyStatus::Passed1b); + } + + #[test] + fn test_update_state_msg2a() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 1, + }; + let mut party = Party::::new(0, cfg, MockValueSelector).0; + party.status = PartyStatus::Passed1b; + party.ballot = 1; + + let msg = Message2aContent { + ballot: 1, + value: serde_json::to_vec(&MockValue(42)).unwrap(), + }; + let routing = MessageRouting { + sender: 1, + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg2a, + }; + + let msg_wire = MessageWire { + content_bytes: serde_json::to_vec(&msg).unwrap(), + routing, + }; + + party + .update_state(msg_wire.content_bytes, msg_wire.routing) + .unwrap(); + + assert_eq!(party.status, PartyStatus::Passed2a); + } + + #[test] + fn test_update_state_msg2av() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 1, + }; + let mut party = Party::::new(0, cfg, MockValueSelector).0; + party.status = PartyStatus::Passed2a; + party.ballot = 1; + party.value_2a = Some(MockValue(42)); + + // Send first 2av message from party 2 (weight 3) + let msg1 = Message2avContent { + ballot: 1, + received_value: serde_json::to_vec(&MockValue(42)).unwrap(), + }; + let routing1 = MessageRouting { + sender: 2, + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg2av, + }; + + let msg_wire1 = MessageWire { + content_bytes: serde_json::to_vec(&msg1).unwrap(), + routing: routing1, + }; + + party + .update_state(msg_wire1.content_bytes, msg_wire1.routing) + .unwrap(); + + // Now send a second 2av message from party 1 (weight 2) + let msg2 = Message2avContent { + ballot: 1, + received_value: serde_json::to_vec(&MockValue(42)).unwrap(), + }; + let routing2 = MessageRouting { + sender: 1, + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg2av, + }; + + let msg_wire2 = MessageWire { + content_bytes: serde_json::to_vec(&msg2).unwrap(), + routing: routing2, + }; + + party + .update_state(msg_wire2.content_bytes, msg_wire2.routing) + .unwrap(); + + // The cumulative weight (3 + 2) should exceed the threshold of 4 + assert_eq!(party.status, PartyStatus::Passed2av); + } + + #[test] + fn test_update_state_msg2b() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 1, + }; + let mut party = Party::::new(0, cfg, MockValueSelector).0; + party.status = PartyStatus::Passed2av; + party.ballot = 1; + + // Simulate that both party 2 and party 1 already sent 2av messages + party.messages_2av_senders.insert(2); // Party 2 + party.messages_2av_senders.insert(1); // Party 1 + party.messages_2av_weight = 3; // Party 2 weight + + // Send first 2b message from party 2 (weight 3) + let msg1 = Message2bContent { ballot: 1 }; + let routing1 = MessageRouting { + sender: 2, + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg2b, + }; + + let msg_wire1 = MessageWire { + content_bytes: serde_json::to_vec(&msg1).unwrap(), + routing: routing1, + }; + + party + .update_state(msg_wire1.content_bytes, msg_wire1.routing) + .unwrap(); + + // Print the current state and weight + println!( + "After first Msg2b: Status = {:?}, 2b Weight = {}", + party.status, party.messages_2b_weight + ); + + // Now send a second 2b message from party 1 (weight 2) + let msg2 = Message2bContent { ballot: 1 }; + let routing2 = MessageRouting { + sender: 1, + receivers: vec![0], + is_broadcast: false, + msg_type: ProtocolMessage::Msg2b, + }; + + let msg_wire2 = MessageWire { + content_bytes: serde_json::to_vec(&msg2).unwrap(), + routing: routing2, + }; + + party + .update_state(msg_wire2.content_bytes, msg_wire2.routing) + .unwrap(); + + // Print the current state and weight + println!( + "After second Msg2b: Status = {:?}, 2b Weight = {}", + party.status, party.messages_2b_weight + ); + + // The cumulative weight (3 + 2) should exceed the threshold of 4 + assert_eq!(party.status, PartyStatus::Passed2b); + } } From 510aa327a1b0304b9c566b25934eab7fed7c3a4d Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Sun, 11 Aug 2024 01:26:35 +0300 Subject: [PATCH 17/21] feat: added more tests on party --- src/party.rs | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/src/party.rs b/src/party.rs index 6bc7d8e..8dae370 100644 --- a/src/party.rs +++ b/src/party.rs @@ -859,4 +859,79 @@ mod tests { // The cumulative weight (3 + 2) should exceed the threshold of 4 assert_eq!(party.status, PartyStatus::Passed2b); } + + #[test] + fn test_follow_event_launch1a() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 0, + }; + let (mut party, _msg_out_receiver, _msg_in_sender) = + Party::::new(0, cfg, MockValueSelector); + + party.status = PartyStatus::Launched; + party.ballot = 1; + + party + .follow_event(PartyEvent::Launch1a) + .expect("Failed to follow Launch1a event"); + + // If the party is the leader and in the Launched state, the event should trigger a message. + assert_eq!(party.status, PartyStatus::Launched); // Status remains Launched, as no state change expected here + } + + #[test] + fn test_ballot_reset_after_failure() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 0, + }; + let (mut party, _, _) = + Party::::new(0, cfg, MockValueSelector); + + party.status = PartyStatus::Failed; + party.ballot = 1; + + party.prepare_next_ballot(); + + // Check that state has been reset + assert_eq!(party.status, PartyStatus::Launched); + assert_eq!(party.ballot, 2); // Ballot number should have incremented + assert!(party.parties_voted_before.is_empty()); + assert_eq!(party.messages_1b_weight, 0); + assert!(party.messages_2av_senders.is_empty()); + assert_eq!(party.messages_2av_weight, 0); + assert!(party.messages_2b_senders.is_empty()); + assert_eq!(party.messages_2b_weight, 0); + } + + #[test] + fn test_follow_event_communication_failure() { + let cfg = BPConConfig { + party_weights: vec![1, 2, 3], + threshold: 4, + leader: 0, + }; + let (mut party, msg_out_receiver, _) = + Party::::new(0, cfg, MockValueSelector); + + party.status = PartyStatus::Launched; + party.ballot = 1; + + drop(msg_out_receiver); // Drop the receiver to simulate a communication failure + + let result = party.follow_event(PartyEvent::Launch1a); + + match result { + Err(BallotError::Communication(err_msg)) => { + assert_eq!( + err_msg, "Failed to send Msg1a", + "Expected specific communication error message" + ); + } + _ => panic!("Expected BallotError::Communication, got {:?}", result), + } + } } From 15aede6e75e0b07d928df6d344f927e482fd5ead Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Mon, 12 Aug 2024 12:24:35 +0300 Subject: [PATCH 18/21] feat: computing leader in a randomized manner deterministically using config-based seed --- src/error.rs | 2 + src/party.rs | 138 +++++++++++++++++++++++++++++---------------------- 2 files changed, 82 insertions(+), 58 deletions(-) diff --git a/src/error.rs b/src/error.rs index 1d031e5..9870ea3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,6 +7,7 @@ pub enum BallotError { MessageParsing(String), InvalidState(String), Communication(String), + LeaderElection(String), } impl fmt::Display for BallotError { @@ -15,6 +16,7 @@ impl fmt::Display for BallotError { BallotError::MessageParsing(ref err) => write!(f, "Message parsing error: {}", err), BallotError::InvalidState(ref err) => write!(f, "Invalid state error: {}", err), BallotError::Communication(ref err) => write!(f, "Communication error: {}", err), + BallotError::LeaderElection(ref err) => write!(f, "Leader election error: {}", err), } } } diff --git a/src/party.rs b/src/party.rs index 8dae370..3108696 100644 --- a/src/party.rs +++ b/src/party.rs @@ -6,9 +6,13 @@ use crate::message::{ MessageRouting, MessageWire, ProtocolMessage, }; use crate::{Value, ValueSelector}; +use rand::prelude::StdRng; +use rand::prelude::*; use rand::Rng; use std::cmp::PartialEq; +use std::collections::hash_map::DefaultHasher; use std::collections::{HashMap, HashSet}; +use std::hash::{Hash, Hasher}; use std::sync::mpsc::{channel, Receiver, Sender}; /// BPCon configuration. Includes ballot time bounds and other stuff. @@ -18,8 +22,62 @@ pub struct BPConConfig { /// Threshold weight to define BFT quorum: should be > 2/3 of total weight pub threshold: u128, - pub leader: u64, // TODO: define other config fields. + /// Leader of the ballot, computed using seed obtained from config. + leader: u64, +} + +impl BPConConfig { + /// Create new config instance. + pub fn new(party_weights: Vec, threshold: u128) -> Self { + let mut cfg = Self { + party_weights, + threshold, + leader: 0, + }; + cfg.leader = cfg.compute_leader().unwrap(); + + cfg + } + + /// Compute leader in a weighed randomized manner. + /// Uses seed from the config, making it deterministic. + fn compute_leader(&self) -> Result { + let seed = self.compute_seed(); + + let total_weight: u64 = self.party_weights.iter().sum(); + if total_weight == 0 { + return Err(BallotError::LeaderElection("Zero weight sum".into())); + } + + // Use the seed from the config to create a deterministic random number generator. + let mut rng = StdRng::seed_from_u64(seed); + + let random_value: u64 = rng.gen_range(0..total_weight); + + let mut cumulative_weight = 0; + for (i, &weight) in self.party_weights.iter().enumerate() { + cumulative_weight += weight; + if random_value < cumulative_weight { + return Ok(i as u64); + } + } + Err(BallotError::LeaderElection("Election failed".into())) + } + + /// Compute seed for randomized leader election. + fn compute_seed(&self) -> u64 { + let mut hasher = DefaultHasher::new(); + + // Hash each field that should contribute to the seed + self.party_weights.hash(&mut hasher); + self.threshold.hash(&mut hasher); + + // You can add more fields as needed + + // Generate the seed from the hash + hasher.finish() + } } /// Party status defines the statuses of the ballot for the particular participant @@ -111,46 +169,6 @@ pub struct Party> { messages_2b_weight: u128, } -#[derive(Debug)] -pub enum LeaderElectionError { - ZeroWeightSum, - ElectionFailed, -} - -impl std::fmt::Display for LeaderElectionError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match *self { - LeaderElectionError::ZeroWeightSum => { - write!(f, "party weights sum must be positive value") - } - LeaderElectionError::ElectionFailed => write!(f, "leader election failed"), - } - } -} - -impl std::error::Error for LeaderElectionError {} - -/// Compute leader in a weighed randomized manner. -/// Use this function for inter-ballot instantiation of config. -pub fn compute_leader(party_weights: Vec) -> Result { - let total_weight: u64 = party_weights.iter().sum(); - if total_weight == 0 { - return Err(LeaderElectionError::ZeroWeightSum); - } - - let mut rng = rand::thread_rng(); - let random_value: u64 = rng.gen_range(0..total_weight); - - let mut cumulative_weight = 0; - for (i, &weight) in party_weights.iter().enumerate() { - cumulative_weight += weight; - if random_value < cumulative_weight { - return Ok(i as u64); - } - } - Err(LeaderElectionError::ElectionFailed) -} - impl> Party { pub fn new( id: u64, @@ -582,33 +600,37 @@ mod tests { } #[test] - fn test_compute_leader_weighted_case() { + fn test_compute_leader_determinism() { let party_weights = vec![1, 2, 7]; // Weighted case + let threshold = 7; // example threshold - let mut leader_counts = vec![0; 3]; - let iterations = 10_000; - - for _ in 0..iterations { - let leader = compute_leader(party_weights.clone()).unwrap(); - leader_counts[leader as usize] += 1; - } + // Initialize the configuration once + let config = BPConConfig::new(party_weights.clone(), threshold); - // With 1:2:7 weights, the third party (index 2) should be selected the most frequently - println!("Leader selection counts: {:?}", leader_counts); + // Compute the leader multiple times + let leader1 = config.compute_leader().unwrap(); + let leader2 = config.compute_leader().unwrap(); + let leader3 = config.compute_leader().unwrap(); - assert!(leader_counts[2] > leader_counts[1]); - assert!(leader_counts[1] > leader_counts[0]); + // All leaders should be the same due to deterministic seed + assert_eq!( + leader1, leader2, + "Leaders should be consistent on repeated calls" + ); + assert_eq!( + leader2, leader3, + "Leaders should be consistent on repeated calls" + ); } #[test] + #[should_panic] fn test_compute_leader_zero_weights() { let party_weights = vec![0, 0, 0]; - let result = compute_leader(party_weights); + let threshold = 1; // example threshold - match result { - Err(LeaderElectionError::ZeroWeightSum) => {} // This is the expected outcome - _ => panic!("Expected ZeroWeightSum error"), - } + // Create the config, which will attempt to compute the leader + BPConConfig::new(party_weights, threshold); } #[test] From 0b838229abac235ae0abb562a7dd244aa662bd43 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Mon, 12 Aug 2024 14:41:35 +0300 Subject: [PATCH 19/21] fix: encapsulate message round state for 2av, 2b and improve event emit error handling --- src/party.rs | 156 +++++++++++++++++++++++++++++---------------------- 1 file changed, 89 insertions(+), 67 deletions(-) diff --git a/src/party.rs b/src/party.rs index 3108696..286b03a 100644 --- a/src/party.rs +++ b/src/party.rs @@ -11,6 +11,7 @@ use rand::prelude::*; use rand::Rng; use std::cmp::PartialEq; use std::collections::hash_map::DefaultHasher; +use std::collections::hash_map::Entry::Vacant; use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::sync::mpsc::{channel, Receiver, Sender}; @@ -22,9 +23,10 @@ pub struct BPConConfig { /// Threshold weight to define BFT quorum: should be > 2/3 of total weight pub threshold: u128, - // TODO: define other config fields. + /// Leader of the ballot, computed using seed obtained from config. leader: u64, + // TODO: define other config fields. } impl BPConConfig { @@ -106,6 +108,39 @@ pub(crate) enum PartyEvent { Finalize, } +/// A struct to keep track of senders and the cumulative weight of their messages. +struct MessageRoundState { + senders: HashSet, + weight: u128, +} + +impl MessageRoundState { + /// Creates a new instance of `MessageRoundState`. + fn new() -> Self { + Self { + senders: HashSet::new(), + weight: 0, + } + } + + /// Adds a sender and their corresponding weight. + fn add_sender(&mut self, sender: u64, weight: u128) { + self.senders.insert(sender); + self.weight += weight; + } + + /// Checks if the sender has already sent a message. + fn contains_sender(&self, sender: &u64) -> bool { + self.senders.contains(sender) + } + + /// Resets the state. + fn reset(&mut self) { + self.senders.clear(); + self.weight = 0; + } +} + /// Party of the BPCon protocol that executes ballot. /// /// The communication between party and external @@ -160,13 +195,11 @@ pub struct Party> { /// 2av round state /// - messages_2av_senders: HashSet, - messages_2av_weight: u128, + messages_2av_state: MessageRoundState, /// 2b round state /// - messages_2b_senders: HashSet, - messages_2b_weight: u128, + messages_2b_state: MessageRoundState, } impl> Party { @@ -195,10 +228,8 @@ impl> Party { parties_voted_before: HashMap::new(), messages_1b_weight: 0, value_2a: None, - messages_2av_senders: HashSet::new(), - messages_2av_weight: 0, - messages_2b_senders: HashSet::new(), - messages_2b_weight: 0, + messages_2av_state: MessageRoundState::new(), + messages_2b_state: MessageRoundState::new(), }, msg_out_receiver, msg_in_sender, @@ -251,48 +282,41 @@ impl> Party { } // TODO: Emit events to run ballot protocol according to the ballot configuration - if self.event_sender.send(PartyEvent::Launch1a).is_err() { + self.event_sender.send(PartyEvent::Launch1a).map_err(|_| { self.status = PartyStatus::Failed; - return Err(BallotError::Communication( - "Failed to send Launch1a event".into(), - )); - } - if self.event_sender.send(PartyEvent::Launch1b).is_err() { + BallotError::Communication("Failed to send Launch1a event".into()) + })?; + + self.event_sender.send(PartyEvent::Launch1b).map_err(|_| { self.status = PartyStatus::Failed; - return Err(BallotError::Communication( - "Failed to send Launch1b event".into(), - )); - } - if self.event_sender.send(PartyEvent::Launch2a).is_err() { + BallotError::Communication("Failed to send Launch1b event".into()) + })?; + + self.event_sender.send(PartyEvent::Launch2a).map_err(|_| { self.status = PartyStatus::Failed; - return Err(BallotError::Communication( - "Failed to send Launch2a event".into(), - )); - } - if self.event_sender.send(PartyEvent::Launch2av).is_err() { + BallotError::Communication("Failed to send Launch2a event".into()) + })?; + + self.event_sender.send(PartyEvent::Launch2av).map_err(|_| { self.status = PartyStatus::Failed; - return Err(BallotError::Communication( - "Failed to send Launch2av event".into(), - )); - } - if self.event_sender.send(PartyEvent::Launch2b).is_err() { + BallotError::Communication("Failed to send Launch2av event".into()) + })?; + + self.event_sender.send(PartyEvent::Launch2b).map_err(|_| { self.status = PartyStatus::Failed; - return Err(BallotError::Communication( - "Failed to send Launch2b event".into(), - )); - } - if self.event_sender.send(PartyEvent::Finalize).is_err() { + BallotError::Communication("Failed to send Launch2b event".into()) + })?; + + self.event_sender.send(PartyEvent::Finalize).map_err(|_| { self.status = PartyStatus::Failed; - return Err(BallotError::Communication( - "Failed to send Finalize event".into(), - )); - } + BallotError::Communication("Failed to send Finalize event".into()) + })?; } Ok(self.get_value_selected()) } - /// Prepare state before running a ballot + /// Prepare state before running a ballot. fn prepare_next_ballot(&mut self) { self.status = PartyStatus::None; self.ballot += 1; @@ -301,10 +325,8 @@ impl> Party { self.parties_voted_before = HashMap::new(); self.messages_1b_weight = 0; self.value_2a = None; - self.messages_2av_senders = HashSet::new(); - self.messages_2av_weight = 0; - self.messages_2b_senders = HashSet::new(); - self.messages_2b_weight = 0; + self.messages_2av_state.reset(); + self.messages_2b_state.reset(); // Cleaning channels while self.event_receiver.try_recv().is_ok() {} @@ -350,10 +372,9 @@ impl> Party { } } - if let std::collections::hash_map::Entry::Vacant(e) = - self.parties_voted_before.entry(routing.sender) - { + if let Vacant(e) = self.parties_voted_before.entry(routing.sender) { e.insert(msg.last_value_voted); + self.messages_1b_weight += self.cfg.party_weights[routing.sender as usize] as u128; @@ -414,12 +435,13 @@ impl> Party { )); } - if !self.messages_2av_senders.contains(&routing.sender) { - self.messages_2av_senders.insert(routing.sender); - self.messages_2av_weight += - self.cfg.party_weights[routing.sender as usize] as u128; + if !self.messages_2av_state.contains_sender(&routing.sender) { + self.messages_2av_state.add_sender( + routing.sender, + self.cfg.party_weights[routing.sender as usize] as u128, + ); - if self.messages_2av_weight > self.cfg.threshold { + if self.messages_2av_state.weight > self.cfg.threshold { self.status = PartyStatus::Passed2av; } } @@ -434,14 +456,15 @@ impl> Party { )); } - if self.messages_2av_senders.contains(&routing.sender) - && !self.messages_2b_senders.contains(&routing.sender) + if self.messages_2av_state.contains_sender(&routing.sender) + && !self.messages_2b_state.contains_sender(&routing.sender) { - self.messages_2b_senders.insert(routing.sender); - self.messages_2b_weight += - self.cfg.party_weights[routing.sender as usize] as u128; + self.messages_2b_state.add_sender( + routing.sender, + self.cfg.party_weights[routing.sender as usize] as u128, + ); - if self.messages_2b_weight > self.cfg.threshold { + if self.messages_2b_state.weight > self.cfg.threshold { self.status = PartyStatus::Passed2b; } } @@ -826,9 +849,8 @@ mod tests { party.ballot = 1; // Simulate that both party 2 and party 1 already sent 2av messages - party.messages_2av_senders.insert(2); // Party 2 - party.messages_2av_senders.insert(1); // Party 1 - party.messages_2av_weight = 3; // Party 2 weight + party.messages_2av_state.add_sender(1, 1); + party.messages_2av_state.add_sender(2, 2); // Send first 2b message from party 2 (weight 3) let msg1 = Message2bContent { ballot: 1 }; @@ -851,7 +873,7 @@ mod tests { // Print the current state and weight println!( "After first Msg2b: Status = {:?}, 2b Weight = {}", - party.status, party.messages_2b_weight + party.status, party.messages_2b_state.weight ); // Now send a second 2b message from party 1 (weight 2) @@ -875,7 +897,7 @@ mod tests { // Print the current state and weight println!( "After second Msg2b: Status = {:?}, 2b Weight = {}", - party.status, party.messages_2b_weight + party.status, party.messages_2b_state.weight ); // The cumulative weight (3 + 2) should exceed the threshold of 4 @@ -923,10 +945,10 @@ mod tests { assert_eq!(party.ballot, 2); // Ballot number should have incremented assert!(party.parties_voted_before.is_empty()); assert_eq!(party.messages_1b_weight, 0); - assert!(party.messages_2av_senders.is_empty()); - assert_eq!(party.messages_2av_weight, 0); - assert!(party.messages_2b_senders.is_empty()); - assert_eq!(party.messages_2b_weight, 0); + assert!(party.messages_2av_state.senders.is_empty()); + assert_eq!(party.messages_2av_state.weight, 0); + assert!(party.messages_2b_state.senders.is_empty()); + assert_eq!(party.messages_2b_state.weight, 0); } #[test] From 90d594600ae674c81a665efc94d093f10a5403f0 Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Mon, 12 Aug 2024 19:12:45 +0300 Subject: [PATCH 20/21] feat: messages with rkyv, value with bincode --- Cargo.toml | 5 +- src/error.rs | 2 + src/lib.rs | 4 +- src/message.rs | 27 ++++++--- src/party.rs | 158 ++++++++++++++++++++++++++++++++----------------- 5 files changed, 130 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4b972ac..4383852 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = { version = "1.0.204", features = ["derive"] } -serde_json = "1.0.122" rand = "0.9.0-alpha.2" log = "0.4.22" +rkyv = { version = "0.7.44", features = ["validation"]} +serde = { version = "1.0.207", features = ["derive"] } +bincode = "1.3.3" diff --git a/src/error.rs b/src/error.rs index 9870ea3..b9d84b8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,6 +5,7 @@ use std::fmt; #[derive(Debug)] pub enum BallotError { MessageParsing(String), + ValueParsing(String), InvalidState(String), Communication(String), LeaderElection(String), @@ -14,6 +15,7 @@ impl fmt::Display for BallotError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { BallotError::MessageParsing(ref err) => write!(f, "Message parsing error: {}", err), + BallotError::ValueParsing(ref err) => write!(f, "Value parsing error: {}", err), BallotError::InvalidState(ref err) => write!(f, "Invalid state error: {}", err), BallotError::Communication(ref err) => write!(f, "Communication error: {}", err), BallotError::LeaderElection(ref err) => write!(f, "Leader election error: {}", err), diff --git a/src/lib.rs b/src/lib.rs index 63a7bd2..afa03b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,10 +14,10 @@ pub trait Value: Eq + Serialize + for<'a> Deserialize<'a> + Clone {} /// Party can not vote for different values, even in different ballots. pub trait ValueSelector { /// Verifies if a value is selected correctly. Accepts 2b messages from parties. - fn verify(&self, v: &V, m: &HashMap>>) -> bool; + fn verify(&self, v: &V, m: &HashMap>) -> bool; /// Select value depending on inner conditions. Accepts 2b messages from parties. - fn select(&self, m: &HashMap>>) -> V; + fn select(&self, m: &HashMap>) -> V; // TODO: add other fields to update selector state. } diff --git a/src/message.rs b/src/message.rs index 9e9eb3c..886e2f8 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,11 +1,11 @@ //! Definition of the BPCon messages. -use serde::{Deserialize, Serialize}; +use rkyv::{AlignedVec, Archive, Deserialize, Serialize}; /// Message ready for transfer. pub struct MessageWire { /// Serialized message contents. - pub content_bytes: Vec, + pub content_bytes: AlignedVec, /// Routing information. pub routing: MessageRouting, } @@ -31,31 +31,44 @@ pub enum ProtocolMessage { Msg2b, } -#[derive(Serialize, Deserialize, Clone, Debug)] +// Value in messages is stored in serialized format, i.e bytes in order to omit +// strict restriction for `Value` trait to be [de]serializable only with `rkyv`. + +#[derive(Archive, Deserialize, Serialize, Debug, Clone)] +#[archive(compare(PartialEq), check_bytes)] +#[archive_attr(derive(Debug))] pub struct Message1aContent { pub ballot: u64, } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Archive, Deserialize, Serialize, Debug, Clone)] +#[archive(compare(PartialEq), check_bytes)] +#[archive_attr(derive(Debug))] pub struct Message1bContent { pub ballot: u64, pub last_ballot_voted: Option, pub last_value_voted: Option>, } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Archive, Deserialize, Serialize, Debug, Clone)] +#[archive(compare(PartialEq), check_bytes)] +#[archive_attr(derive(Debug))] pub struct Message2aContent { pub ballot: u64, pub value: Vec, } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Archive, Deserialize, Serialize, Debug, Clone)] +#[archive(compare(PartialEq), check_bytes)] +#[archive_attr(derive(Debug))] pub struct Message2avContent { pub ballot: u64, pub received_value: Vec, } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Archive, Deserialize, Serialize, Debug, Clone)] +#[archive(compare(PartialEq), check_bytes)] +#[archive_attr(derive(Debug))] pub struct Message2bContent { pub ballot: u64, } diff --git a/src/party.rs b/src/party.rs index 286b03a..4d8e920 100644 --- a/src/party.rs +++ b/src/party.rs @@ -9,6 +9,7 @@ use crate::{Value, ValueSelector}; use rand::prelude::StdRng; use rand::prelude::*; use rand::Rng; +use rkyv::{AlignedVec, Deserialize, Infallible}; use std::cmp::PartialEq; use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::Entry::Vacant; @@ -180,13 +181,13 @@ pub struct Party> { last_ballot_voted: Option, /// Last value for which party submitted 2b message - last_value_voted: Option>, + last_value_voted: Option, /// Local round fields /// 1b round state /// - parties_voted_before: HashMap>>, // id <-> value + parties_voted_before: HashMap>, // id <-> value messages_1b_weight: u128, /// 2a round state @@ -336,11 +337,18 @@ impl> Party { } /// Update party's state based on message type. - fn update_state(&mut self, m: Vec, routing: MessageRouting) -> Result<(), BallotError> { + fn update_state(&mut self, m: AlignedVec, routing: MessageRouting) -> Result<(), BallotError> { match routing.msg_type { ProtocolMessage::Msg1a => { - let msg: Message1aContent = serde_json::from_slice(m.as_slice()) - .map_err(|_| BallotError::MessageParsing("Failed to parse Msg1a".into()))?; + let archived = + rkyv::check_archived_root::(&m[..]).map_err(|err| { + BallotError::MessageParsing(format!("Validation error: {:?}", err)) + })?; + + let msg: Message1aContent = + archived.deserialize(&mut Infallible).map_err(|err| { + BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) + })?; if msg.ballot != self.ballot { return Err(BallotError::InvalidState( @@ -355,8 +363,15 @@ impl> Party { self.status = PartyStatus::Passed1a; } ProtocolMessage::Msg1b => { - let msg: Message1bContent = serde_json::from_slice(m.as_slice()) - .map_err(|_| BallotError::MessageParsing("Failed to parse Msg1b".into()))?; + let archived = + rkyv::check_archived_root::(&m[..]).map_err(|err| { + BallotError::MessageParsing(format!("Validation error: {:?}", err)) + })?; + + let msg: Message1bContent = + archived.deserialize(&mut Infallible).map_err(|err| { + BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) + })?; if msg.ballot != self.ballot { return Err(BallotError::InvalidState( @@ -373,7 +388,14 @@ impl> Party { } if let Vacant(e) = self.parties_voted_before.entry(routing.sender) { - e.insert(msg.last_value_voted); + let value: Option = match msg.last_value_voted { + Some(ref data) => Some(bincode::deserialize(data).map_err(|err| { + BallotError::ValueParsing(format!("Deserialization error: {:?}", err)) + })?), + None => None, + }; + + e.insert(value); self.messages_1b_weight += self.cfg.party_weights[routing.sender as usize] as u128; @@ -384,8 +406,15 @@ impl> Party { } } ProtocolMessage::Msg2a => { - let msg: Message2aContent = serde_json::from_slice(m.as_slice()) - .map_err(|_| BallotError::MessageParsing("Failed to parse Msg2a".into()))?; + let archived = + rkyv::check_archived_root::(&m[..]).map_err(|err| { + BallotError::MessageParsing(format!("Validation error: {:?}", err)) + })?; + + let msg: Message2aContent = + archived.deserialize(&mut Infallible).map_err(|err| { + BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) + })?; if msg.ballot != self.ballot { return Err(BallotError::InvalidState( @@ -397,10 +426,9 @@ impl> Party { return Err(BallotError::InvalidState("Invalid leader in Msg2a".into())); } - let value_received: V = - serde_json::from_slice(msg.value.as_slice()).map_err(|_| { - BallotError::MessageParsing("Failed to parse value in Msg2a".into()) - })?; + let value_received = bincode::deserialize(&msg.value[..]).map_err(|err| { + BallotError::ValueParsing(format!("Failed to parse value in Msg2a: {:?}", err)) + })?; if self .value_selector @@ -415,18 +443,27 @@ impl> Party { } } ProtocolMessage::Msg2av => { - let msg: Message2avContent = serde_json::from_slice(m.as_slice()) - .map_err(|_| BallotError::MessageParsing("Failed to parse Msg2av".into()))?; + let archived = + rkyv::check_archived_root::(&m[..]).map_err(|err| { + BallotError::MessageParsing(format!("Validation error: {:?}", err)) + })?; + + let msg: Message2avContent = + archived.deserialize(&mut Infallible).map_err(|err| { + BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) + })?; if msg.ballot != self.ballot { return Err(BallotError::InvalidState( "Ballot number mismatch in Msg2av".into(), )); } - - let value_received: V = serde_json::from_slice(msg.received_value.as_slice()) - .map_err(|_| { - BallotError::MessageParsing("Failed to parse value in Msg2av".into()) + let value_received: V = + bincode::deserialize(&msg.received_value[..]).map_err(|err| { + BallotError::ValueParsing(format!( + "Failed to parse value in Msg2av: {:?}", + err + )) })?; if value_received != self.value_2a.clone().unwrap() { @@ -447,8 +484,15 @@ impl> Party { } } ProtocolMessage::Msg2b => { - let msg: Message2bContent = serde_json::from_slice(m.as_slice()) - .map_err(|_| BallotError::MessageParsing("Failed to parse Msg2b".into()))?; + let archived = + rkyv::check_archived_root::(&m[..]).map_err(|err| { + BallotError::MessageParsing(format!("Validation error: {:?}", err)) + })?; + + let msg: Message2bContent = + archived.deserialize(&mut Infallible).map_err(|err| { + BallotError::MessageParsing(format!("Deserialization error: {:?}", err)) + })?; if msg.ballot != self.ballot { return Err(BallotError::InvalidState( @@ -485,7 +529,7 @@ impl> Party { if self.cfg.leader == self.id { self.msg_out_sender .send(MessageWire { - content_bytes: serde_json::to_vec(&Message1aContent { + content_bytes: rkyv::to_bytes::<_, 256>(&Message1aContent { ballot: self.ballot, }) .map_err(|_| { @@ -504,10 +548,20 @@ impl> Party { } self.msg_out_sender .send(MessageWire { - content_bytes: serde_json::to_vec(&Message1bContent { + content_bytes: rkyv::to_bytes::<_, 256>(&Message1bContent { ballot: self.ballot, last_ballot_voted: self.last_ballot_voted, - last_value_voted: self.last_value_voted.clone(), + last_value_voted: self + .last_value_voted + .clone() + .map(|inner_data| { + bincode::serialize(&inner_data).map_err(|_| { + BallotError::ValueParsing( + "Failed to serialize value".into(), + ) + }) + }) + .transpose()?, }) .map_err(|_| { BallotError::MessageParsing("Failed to serialize Msg1b".into()) @@ -525,12 +579,10 @@ impl> Party { if self.cfg.leader == self.id { self.msg_out_sender .send(MessageWire { - content_bytes: serde_json::to_vec(&Message2aContent { + content_bytes: rkyv::to_bytes::<_, 256>(&Message2aContent { ballot: self.ballot, - value: serde_json::to_vec(&self.get_value()).map_err(|_| { - BallotError::MessageParsing( - "Failed to serialize value for Msg2a".into(), - ) + value: bincode::serialize(&self.get_value()).map_err(|_| { + BallotError::ValueParsing("Failed to serialize value".into()) })?, }) .map_err(|_| { @@ -549,14 +601,11 @@ impl> Party { } self.msg_out_sender .send(MessageWire { - content_bytes: serde_json::to_vec(&Message2avContent { + content_bytes: rkyv::to_bytes::<_, 256>(&Message2avContent { ballot: self.ballot, - received_value: serde_json::to_vec(&self.value_2a.clone().unwrap()) - .map_err(|_| { - BallotError::MessageParsing( - "Failed to serialize value for Msg2av".into(), - ) - })?, + received_value: bincode::serialize(&self.value_2a.clone()).map_err( + |_| BallotError::ValueParsing("Failed to serialize value".into()), + )?, }) .map_err(|_| { BallotError::MessageParsing("Failed to serialize Msg2av".into()) @@ -573,7 +622,7 @@ impl> Party { } self.msg_out_sender .send(MessageWire { - content_bytes: serde_json::to_vec(&Message2bContent { + content_bytes: rkyv::to_bytes::<_, 256>(&Message2bContent { ballot: self.ballot, }) .map_err(|_| { @@ -600,11 +649,10 @@ impl> Party { mod tests { use super::*; - use serde::{Deserialize, Serialize}; use std::collections::HashMap; // Mock implementation of Value - #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] + #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] struct MockValue(u64); // Simple mock type wrapping an integer impl Value for MockValue {} @@ -613,11 +661,11 @@ mod tests { struct MockValueSelector; impl ValueSelector for MockValueSelector { - fn verify(&self, _v: &MockValue, _m: &HashMap>>) -> bool { + fn verify(&self, _v: &MockValue, _m: &HashMap>) -> bool { true // For testing, always return true } - fn select(&self, _m: &HashMap>>) -> MockValue { + fn select(&self, _m: &HashMap>) -> MockValue { MockValue(42) // For testing, always return the same value } } @@ -676,7 +724,7 @@ mod tests { }; let msg_wire = MessageWire { - content_bytes: serde_json::to_vec(&msg).unwrap(), + content_bytes: rkyv::to_bytes::<_, 256>(&msg).unwrap(), routing, }; @@ -701,7 +749,7 @@ mod tests { let msg1 = Message1bContent { ballot: 1, last_ballot_voted: Some(0), - last_value_voted: Some(vec![1, 2, 3]), + last_value_voted: bincode::serialize(&MockValue(42)).ok(), }; let routing1 = MessageRouting { sender: 1, // Party 1 sends the message @@ -711,7 +759,7 @@ mod tests { }; let msg_wire1 = MessageWire { - content_bytes: serde_json::to_vec(&msg1).unwrap(), + content_bytes: rkyv::to_bytes::<_, 256>(&msg1).unwrap(), routing: routing1, }; @@ -723,7 +771,7 @@ mod tests { let msg2 = Message1bContent { ballot: 1, last_ballot_voted: Some(0), - last_value_voted: Some(vec![1, 2, 3]), + last_value_voted: bincode::serialize(&MockValue(42)).ok(), }; let routing2 = MessageRouting { sender: 2, // Party 2 sends the message @@ -733,7 +781,7 @@ mod tests { }; let msg_wire2 = MessageWire { - content_bytes: serde_json::to_vec(&msg2).unwrap(), + content_bytes: rkyv::to_bytes::<_, 256>(&msg2).unwrap(), routing: routing2, }; @@ -758,7 +806,7 @@ mod tests { let msg = Message2aContent { ballot: 1, - value: serde_json::to_vec(&MockValue(42)).unwrap(), + value: bincode::serialize(&MockValue(42)).unwrap(), }; let routing = MessageRouting { sender: 1, @@ -768,7 +816,7 @@ mod tests { }; let msg_wire = MessageWire { - content_bytes: serde_json::to_vec(&msg).unwrap(), + content_bytes: rkyv::to_bytes::<_, 256>(&msg).unwrap(), routing, }; @@ -794,7 +842,7 @@ mod tests { // Send first 2av message from party 2 (weight 3) let msg1 = Message2avContent { ballot: 1, - received_value: serde_json::to_vec(&MockValue(42)).unwrap(), + received_value: bincode::serialize(&MockValue(42)).unwrap(), }; let routing1 = MessageRouting { sender: 2, @@ -804,7 +852,7 @@ mod tests { }; let msg_wire1 = MessageWire { - content_bytes: serde_json::to_vec(&msg1).unwrap(), + content_bytes: rkyv::to_bytes::<_, 256>(&msg1).unwrap(), routing: routing1, }; @@ -815,7 +863,7 @@ mod tests { // Now send a second 2av message from party 1 (weight 2) let msg2 = Message2avContent { ballot: 1, - received_value: serde_json::to_vec(&MockValue(42)).unwrap(), + received_value: bincode::serialize(&MockValue(42)).unwrap(), }; let routing2 = MessageRouting { sender: 1, @@ -825,7 +873,7 @@ mod tests { }; let msg_wire2 = MessageWire { - content_bytes: serde_json::to_vec(&msg2).unwrap(), + content_bytes: rkyv::to_bytes::<_, 256>(&msg2).unwrap(), routing: routing2, }; @@ -862,7 +910,7 @@ mod tests { }; let msg_wire1 = MessageWire { - content_bytes: serde_json::to_vec(&msg1).unwrap(), + content_bytes: rkyv::to_bytes::<_, 256>(&msg1).unwrap(), routing: routing1, }; @@ -886,7 +934,7 @@ mod tests { }; let msg_wire2 = MessageWire { - content_bytes: serde_json::to_vec(&msg2).unwrap(), + content_bytes: rkyv::to_bytes::<_, 256>(&msg2).unwrap(), routing: routing2, }; From a512c514d2a1ca37a63921f36885c69af98d6fea Mon Sep 17 00:00:00 2001 From: Nikita Masych Date: Mon, 12 Aug 2024 19:13:12 +0300 Subject: [PATCH 21/21] fix: rename MessageWire to MessagePacket --- src/message.rs | 2 +- src/party.rs | 34 +++++++++++++++++----------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/message.rs b/src/message.rs index 886e2f8..daa86ab 100644 --- a/src/message.rs +++ b/src/message.rs @@ -3,7 +3,7 @@ use rkyv::{AlignedVec, Archive, Deserialize, Serialize}; /// Message ready for transfer. -pub struct MessageWire { +pub struct MessagePacket { /// Serialized message contents. pub content_bytes: AlignedVec, /// Routing information. diff --git a/src/party.rs b/src/party.rs index 4d8e920..85f6b6f 100644 --- a/src/party.rs +++ b/src/party.rs @@ -3,7 +3,7 @@ use crate::error::BallotError; use crate::message::{ Message1aContent, Message1bContent, Message2aContent, Message2avContent, Message2bContent, - MessageRouting, MessageWire, ProtocolMessage, + MessagePacket, MessageRouting, ProtocolMessage, }; use crate::{Value, ValueSelector}; use rand::prelude::StdRng; @@ -158,8 +158,8 @@ pub struct Party> { pub id: u64, /// Communication queues. - msg_in_receiver: Receiver, - msg_out_sender: Sender, + msg_in_receiver: Receiver, + msg_out_sender: Sender, /// Query to receive and send events that run ballot protocol event_receiver: Receiver, @@ -208,7 +208,7 @@ impl> Party { id: u64, cfg: BPConConfig, value_selector: VS, - ) -> (Self, Receiver, Sender) { + ) -> (Self, Receiver, Sender) { let (event_sender, event_receiver) = channel(); let (msg_in_sender, msg_in_receiver) = channel(); let (msg_out_sender, msg_out_receiver) = channel(); @@ -528,7 +528,7 @@ impl> Party { } if self.cfg.leader == self.id { self.msg_out_sender - .send(MessageWire { + .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message1aContent { ballot: self.ballot, }) @@ -547,7 +547,7 @@ impl> Party { )); } self.msg_out_sender - .send(MessageWire { + .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message1bContent { ballot: self.ballot, last_ballot_voted: self.last_ballot_voted, @@ -578,7 +578,7 @@ impl> Party { } if self.cfg.leader == self.id { self.msg_out_sender - .send(MessageWire { + .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message2aContent { ballot: self.ballot, value: bincode::serialize(&self.get_value()).map_err(|_| { @@ -600,7 +600,7 @@ impl> Party { )); } self.msg_out_sender - .send(MessageWire { + .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message2avContent { ballot: self.ballot, received_value: bincode::serialize(&self.value_2a.clone()).map_err( @@ -621,7 +621,7 @@ impl> Party { )); } self.msg_out_sender - .send(MessageWire { + .send(MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&Message2bContent { ballot: self.ballot, }) @@ -723,7 +723,7 @@ mod tests { msg_type: ProtocolMessage::Msg1a, }; - let msg_wire = MessageWire { + let msg_wire = MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&msg).unwrap(), routing, }; @@ -758,7 +758,7 @@ mod tests { msg_type: ProtocolMessage::Msg1b, }; - let msg_wire1 = MessageWire { + let msg_wire1 = MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&msg1).unwrap(), routing: routing1, }; @@ -780,7 +780,7 @@ mod tests { msg_type: ProtocolMessage::Msg1b, }; - let msg_wire2 = MessageWire { + let msg_wire2 = MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&msg2).unwrap(), routing: routing2, }; @@ -815,7 +815,7 @@ mod tests { msg_type: ProtocolMessage::Msg2a, }; - let msg_wire = MessageWire { + let msg_wire = MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&msg).unwrap(), routing, }; @@ -851,7 +851,7 @@ mod tests { msg_type: ProtocolMessage::Msg2av, }; - let msg_wire1 = MessageWire { + let msg_wire1 = MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&msg1).unwrap(), routing: routing1, }; @@ -872,7 +872,7 @@ mod tests { msg_type: ProtocolMessage::Msg2av, }; - let msg_wire2 = MessageWire { + let msg_wire2 = MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&msg2).unwrap(), routing: routing2, }; @@ -909,7 +909,7 @@ mod tests { msg_type: ProtocolMessage::Msg2b, }; - let msg_wire1 = MessageWire { + let msg_wire1 = MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&msg1).unwrap(), routing: routing1, }; @@ -933,7 +933,7 @@ mod tests { msg_type: ProtocolMessage::Msg2b, }; - let msg_wire2 = MessageWire { + let msg_wire2 = MessagePacket { content_bytes: rkyv::to_bytes::<_, 256>(&msg2).unwrap(), routing: routing2, };