Skip to content

Commit

Permalink
fix: resolved end to end ballot -> working
Browse files Browse the repository at this point in the history
  • Loading branch information
NikitaMasych committed Aug 29, 2024
1 parent b909454 commit 427356b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 40 deletions.
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ impl BPConConfig {
party_weights,
threshold,
// TODO: deduce actually good defaults
launch_timeout: Duration::from_secs(1),
launch_timeout: Duration::from_secs(0),
launch1a_timeout: Duration::from_secs(5),
launch1b_timeout: Duration::from_secs(10),
launch2a_timeout: Duration::from_secs(15),
launch2av_timeout: Duration::from_secs(20),
launch2b_timeout: Duration::from_secs(25),
finalize_timeout: Duration::from_secs(30),
grace_period: Duration::from_secs(2),
grace_period: Duration::from_secs(1),
}
}
}
64 changes: 26 additions & 38 deletions src/party.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::message::{
MessagePacket, MessageRoundState, ProtocolMessage,
};
use crate::{Value, ValueSelector};
use log::warn;
use log::{debug, warn};
use std::cmp::PartialEq;
use std::collections::hash_map::Entry::Vacant;
use std::collections::HashMap;
Expand Down Expand Up @@ -269,6 +269,7 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
msg = self.msg_in_receiver.recv() => {
sleep(self.cfg.grace_period).await;
if let Some(msg) = msg {
debug!("Party {} received {} from party {}", self.id, msg.routing.msg_type, msg.routing.sender);
if let Err(err) = self.update_state(&msg) {
// Shouldn't fail the party, since invalid message
// may be sent by anyone.
Expand Down Expand Up @@ -532,12 +533,12 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
.into());
}

let content = &Message1aContent {
ballot: self.ballot,
};
let msg = content.pack(self.id)?;

if self.leader == self.id {
let content = &Message1aContent {
ballot: self.ballot,
};
let msg = content.pack(self.id)?;

self.msg_out_sender
.send(msg)
.map_err(|err| FailedToSendMessage(err.to_string()))?;
Expand Down Expand Up @@ -581,20 +582,22 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
}
.into());
}
if self.leader == self.id {
let value = bincode::serialize(&self.get_value())
.map_err(|err| SerializationError::Value(err.to_string()))?;

let value = bincode::serialize(&self.get_value())
.map_err(|err| SerializationError::Value(err.to_string()))?;

let content = &Message2aContent {
ballot: self.ballot,
value,
};
let msg = content.pack(self.id)?;
let content = &Message2aContent {
ballot: self.ballot,
value,
};
let msg = content.pack(self.id)?;

if self.leader == self.id {
self.msg_out_sender
.send(msg)
.map_err(|err| FailedToSendMessage(err.to_string()))?;

self.value_2a = Some(self.get_value());
self.status = PartyStatus::Passed2a;
}
}
PartyEvent::Launch2av => {
Expand All @@ -606,7 +609,7 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
.into());
}

let received_value = bincode::serialize(&self.value_2a.clone())
let received_value = bincode::serialize(&self.value_2a.clone().unwrap())
.map_err(|err| SerializationError::Value(err.to_string()))?;

let content = &Message2avContent {
Expand Down Expand Up @@ -917,25 +920,13 @@ pub(crate) mod tests {
}

#[tokio::test]
#[ignore] // this is unfinished test
async fn test_end_to_end_ballot() {
// Configuration for the parties
let cfg = BPConConfig {
party_weights: vec![1, 2, 3, 4], // Total weight is 10
threshold: 7, // 2/3 of total weight is ~6.67, so we set 7 as threshold
launch_timeout: Duration::from_secs(1),
launch1a_timeout: Duration::from_secs(5),
launch1b_timeout: Duration::from_secs(5),
launch2a_timeout: Duration::from_secs(5),
launch2av_timeout: Duration::from_secs(5),
launch2b_timeout: Duration::from_secs(5),
finalize_timeout: Duration::from_secs(5),
grace_period: Duration::from_secs(2),
};
let cfg = BPConConfig::with_default_timeouts(vec![1, 1, 1, 1], 2);

// ValueSelector and LeaderElector instances
let value_selector = MockValueSelector;
let leader_elector = Box::new(DefaultLeaderElector {});
let leader_elector = Box::new(DefaultLeaderElector::new());

// Create 4 parties
let (mut party0, msg_out_receiver0, msg_in_sender0) =
Expand Down Expand Up @@ -973,14 +964,11 @@ pub(crate) mod tests {
let (value_sender2, value_receiver2) = tokio::sync::oneshot::channel();
let (value_sender3, value_receiver3) = tokio::sync::oneshot::channel();

let leader = party0.elector.elect_leader(&party0).unwrap();
println!("Leader: {leader}");

// Launch ballot tasks for each party
let ballot_task0 = tokio::spawn(async move {
match party0.launch_ballot().await {
Ok(Some(value)) => {
let _ = value_sender0.send(value);
value_sender0.send(value).unwrap();
}
Ok(None) => {
eprintln!("Party 0: No value was selected");
Expand All @@ -994,7 +982,7 @@ pub(crate) mod tests {
let ballot_task1 = tokio::spawn(async move {
match party1.launch_ballot().await {
Ok(Some(value)) => {
let _ = value_sender1.send(value);
value_sender1.send(value).unwrap();
}
Ok(None) => {
eprintln!("Party 1: No value was selected");
Expand All @@ -1008,7 +996,7 @@ pub(crate) mod tests {
let ballot_task2 = tokio::spawn(async move {
match party2.launch_ballot().await {
Ok(Some(value)) => {
let _ = value_sender2.send(value);
value_sender2.send(value).unwrap();
}
Ok(None) => {
eprintln!("Party 2: No value was selected");
Expand All @@ -1022,7 +1010,7 @@ pub(crate) mod tests {
let ballot_task3 = tokio::spawn(async move {
match party3.launch_ballot().await {
Ok(Some(value)) => {
let _ = value_sender3.send(value);
value_sender3.send(value).unwrap();
}
Ok(None) => {
eprintln!("Party 3: No value was selected");
Expand Down Expand Up @@ -1054,7 +1042,7 @@ pub(crate) mod tests {
// Broadcast the message to all other parties
for (j, sender) in senders.iter().enumerate() {
if i != j {
let _ = sender.send(msg.clone());
sender.send(msg.clone()).unwrap();
}
}
}
Expand Down

0 comments on commit 427356b

Please sign in to comment.