Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: primary implementation of the protocol #6

Merged
merged 21 commits into from
Aug 15, 2024
Merged
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: cargo fmt
NikitaMasych committed Aug 10, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 11ac9669f8157202963653297db92ccc96dde1fc
204 changes: 142 additions & 62 deletions src/party.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
//! Definition of the BPCon participant structure.

use crate::error::BallotError;
use crate::message::{
Message1aContent, Message1bContent, Message2aContent, Message2avContent, Message2bContent,
MessageRouting, MessageWire, ProtocolMessage,
};
use crate::{Value, ValueSelector};
use crate::error::BallotError;
use rand::Rng;
use std::cmp::PartialEq;
use std::collections::{HashMap, HashSet};
use std::sync::mpsc::{channel, Receiver, Sender};
use rand::Rng;

/// BPCon configuration. Includes ballot time bounds and other stuff.
pub struct BPConConfig {
@@ -23,7 +23,7 @@ pub struct BPConConfig {

/// Party status defines the statuses of the ballot for the particular participant
/// depending on local calculations.
#[derive(PartialEq,Debug)]
#[derive(PartialEq, Debug)]
pub(crate) enum PartyStatus {
None,
Launched,
@@ -213,27 +213,39 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
// TODO: Emit events to run ballot protocol according to the ballot configuration
if self.event_sender.send(PartyEvent::Launch1a).is_err() {
NikitaMasych marked this conversation as resolved.
Show resolved Hide resolved
self.status = PartyStatus::Failed;
return Err(BallotError::Communication("Failed to send Launch1a event".into()));
return Err(BallotError::Communication(
"Failed to send Launch1a event".into(),
));
}
if self.event_sender.send(PartyEvent::Launch1b).is_err() {
self.status = PartyStatus::Failed;
return Err(BallotError::Communication("Failed to send Launch1b event".into()));
return Err(BallotError::Communication(
"Failed to send Launch1b event".into(),
));
}
if self.event_sender.send(PartyEvent::Launch2a).is_err() {
self.status = PartyStatus::Failed;
return Err(BallotError::Communication("Failed to send Launch2a event".into()));
return Err(BallotError::Communication(
"Failed to send Launch2a event".into(),
));
}
if self.event_sender.send(PartyEvent::Launch2av).is_err() {
self.status = PartyStatus::Failed;
return Err(BallotError::Communication("Failed to send Launch2av event".into()));
return Err(BallotError::Communication(
"Failed to send Launch2av event".into(),
));
}
if self.event_sender.send(PartyEvent::Launch2b).is_err() {
self.status = PartyStatus::Failed;
return Err(BallotError::Communication("Failed to send Launch2b event".into()));
return Err(BallotError::Communication(
"Failed to send Launch2b event".into(),
));
}
if self.event_sender.send(PartyEvent::Finalize).is_err() {
self.status = PartyStatus::Failed;
return Err(BallotError::Communication("Failed to send Finalize event".into()));
return Err(BallotError::Communication(
"Failed to send Finalize event".into(),
));
}
}

@@ -269,7 +281,9 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
.map_err(|_| BallotError::MessageParsing("Failed to parse Msg1a".into()))?;

if msg.ballot != self.ballot {
return Err(BallotError::InvalidState("Ballot number mismatch in Msg1a".into()));
return Err(BallotError::InvalidState(
"Ballot number mismatch in Msg1a".into(),
));
}

if routing.sender != self.get_leader()? {
@@ -283,12 +297,16 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
.map_err(|_| BallotError::MessageParsing("Failed to parse Msg1b".into()))?;

if msg.ballot != self.ballot {
return Err(BallotError::InvalidState("Ballot number mismatch in Msg1b".into()));
return Err(BallotError::InvalidState(
"Ballot number mismatch in Msg1b".into(),
));
}

if let Some(last_ballot_voted) = msg.last_ballot_voted {
if last_ballot_voted >= self.ballot {
return Err(BallotError::InvalidState("Received outdated 1b message".into()));
return Err(BallotError::InvalidState(
"Received outdated 1b message".into(),
NikitaMasych marked this conversation as resolved.
Show resolved Hide resolved
));
}
}

@@ -309,36 +327,51 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
.map_err(|_| BallotError::MessageParsing("Failed to parse Msg2a".into()))?;

if msg.ballot != self.ballot {
return Err(BallotError::InvalidState("Ballot number mismatch in Msg2a".into()));
return Err(BallotError::InvalidState(
"Ballot number mismatch in Msg2a".into(),
));
}

if routing.sender != self.get_leader()? {
return Err(BallotError::InvalidState("Invalid leader in Msg2a".into()));
}

let value_received: V = serde_json::from_slice(msg.value.as_slice())
.map_err(|_| BallotError::MessageParsing("Failed to parse value in Msg2a".into()))?;
let value_received: V =
serde_json::from_slice(msg.value.as_slice()).map_err(|_| {
BallotError::MessageParsing("Failed to parse value in Msg2a".into())
})?;

if self.value_selector.verify(&value_received, &self.parties_voted_before) {
if self
.value_selector
.verify(&value_received, &self.parties_voted_before)
{
self.status = PartyStatus::Passed2a;
self.value_2a = Some(value_received);
} else {
return Err(BallotError::InvalidState("Failed to verify value in Msg2a".into()));
return Err(BallotError::InvalidState(
"Failed to verify value in Msg2a".into(),
));
}
}
ProtocolMessage::Msg2av => {
let msg: Message2avContent = serde_json::from_slice(m.as_slice())
.map_err(|_| BallotError::MessageParsing("Failed to parse Msg2av".into()))?;

if msg.ballot != self.ballot {
return Err(BallotError::InvalidState("Ballot number mismatch in Msg2av".into()));
return Err(BallotError::InvalidState(
"Ballot number mismatch in Msg2av".into(),
));
}

let value_received: V = serde_json::from_slice(msg.received_value.as_slice())
.map_err(|_| BallotError::MessageParsing("Failed to parse value in Msg2av".into()))?;
.map_err(|_| {
BallotError::MessageParsing("Failed to parse value in Msg2av".into())
})?;

if value_received != self.value_2a.clone().unwrap() {
return Err(BallotError::InvalidState("Received different value in Msg2av".into()));
return Err(BallotError::InvalidState(
"Received different value in Msg2av".into(),
));
}

if !self.messages_2av_senders.contains(&routing.sender) {
@@ -356,7 +389,9 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
.map_err(|_| BallotError::MessageParsing("Failed to parse Msg2b".into()))?;

if msg.ballot != self.ballot {
return Err(BallotError::InvalidState("Ballot number mismatch in Msg2b".into()));
return Err(BallotError::InvalidState(
"Ballot number mismatch in Msg2b".into(),
));
}

if self.messages_2av_senders.contains(&routing.sender)
@@ -380,73 +415,116 @@ impl<V: Value, VS: ValueSelector<V>> Party<V, VS> {
match event {
PartyEvent::Launch1a => {
if self.status != PartyStatus::Launched {
return Err(BallotError::InvalidState("Cannot launch 1a, incorrect state".into()));
return Err(BallotError::InvalidState(
"Cannot launch 1a, incorrect state".into(),
));
}
if self.get_leader()? == self.id {
self.msg_out_sender.send(MessageWire {
content_bytes: serde_json::to_vec(&Message1aContent { ballot: self.ballot })
.map_err(|_| BallotError::MessageParsing("Failed to serialize Msg1a".into()))?,
routing: Message1aContent::get_routing(self.id),
}).map_err(|_| BallotError::Communication("Failed to send Msg1a".into()))?;
self.msg_out_sender
.send(MessageWire {
content_bytes: serde_json::to_vec(&Message1aContent {
ballot: self.ballot,
})
.map_err(|_| {
BallotError::MessageParsing("Failed to serialize Msg1a".into())
})?,
routing: Message1aContent::get_routing(self.id),
})
.map_err(|_| BallotError::Communication("Failed to send Msg1a".into()))?;
}
}
PartyEvent::Launch1b => {
if self.status != PartyStatus::Passed1a {
return Err(BallotError::InvalidState("Cannot launch 1b, incorrect state".into()));
return Err(BallotError::InvalidState(
"Cannot launch 1b, incorrect state".into(),
));
}
self.msg_out_sender.send(MessageWire {
content_bytes: serde_json::to_vec(&Message1bContent {
ballot: self.ballot,
last_ballot_voted: self.last_ballot_voted,
last_value_voted: self.last_value_voted.clone(),
self.msg_out_sender
.send(MessageWire {
content_bytes: serde_json::to_vec(&Message1bContent {
ballot: self.ballot,
last_ballot_voted: self.last_ballot_voted,
last_value_voted: self.last_value_voted.clone(),
})
.map_err(|_| {
BallotError::MessageParsing("Failed to serialize Msg1b".into())
})?,
routing: Message1bContent::get_routing(self.id),
})
.map_err(|_| BallotError::MessageParsing("Failed to serialize Msg1b".into()))?,
routing: Message1bContent::get_routing(self.id),
}).map_err(|_| BallotError::Communication("Failed to send Msg1b".into()))?;
.map_err(|_| BallotError::Communication("Failed to send Msg1b".into()))?;
}
PartyEvent::Launch2a => {
if self.status != PartyStatus::Passed1b {
return Err(BallotError::InvalidState("Cannot launch 2a, incorrect state".into()));
return Err(BallotError::InvalidState(
"Cannot launch 2a, incorrect state".into(),
));
}
if self.get_leader()? == self.id {
self.msg_out_sender.send(MessageWire {
content_bytes: serde_json::to_vec(&Message2aContent {
ballot: self.ballot,
value: serde_json::to_vec(&self.get_value())
.map_err(|_| BallotError::MessageParsing("Failed to serialize value for Msg2a".into()))?,
self.msg_out_sender
.send(MessageWire {
content_bytes: serde_json::to_vec(&Message2aContent {
ballot: self.ballot,
value: serde_json::to_vec(&self.get_value()).map_err(|_| {
BallotError::MessageParsing(
"Failed to serialize value for Msg2a".into(),
)
})?,
})
.map_err(|_| {
BallotError::MessageParsing("Failed to serialize Msg2a".into())
})?,
routing: Message2aContent::get_routing(self.id),
})
.map_err(|_| BallotError::MessageParsing("Failed to serialize Msg2a".into()))?,
routing: Message2aContent::get_routing(self.id),
}).map_err(|_| BallotError::Communication("Failed to send Msg2a".into()))?;
.map_err(|_| BallotError::Communication("Failed to send Msg2a".into()))?;
}
}
PartyEvent::Launch2av => {
if self.status != PartyStatus::Passed2a {
return Err(BallotError::InvalidState("Cannot launch 2av, incorrect state".into()));
return Err(BallotError::InvalidState(
"Cannot launch 2av, incorrect state".into(),
));
}
self.msg_out_sender.send(MessageWire {
content_bytes: serde_json::to_vec(&Message2avContent {
ballot: self.ballot,
received_value: serde_json::to_vec(&self.value_2a.clone().unwrap())
.map_err(|_| BallotError::MessageParsing("Failed to serialize value for Msg2av".into()))?,
self.msg_out_sender
.send(MessageWire {
content_bytes: serde_json::to_vec(&Message2avContent {
ballot: self.ballot,
received_value: serde_json::to_vec(&self.value_2a.clone().unwrap())
.map_err(|_| {
BallotError::MessageParsing(
"Failed to serialize value for Msg2av".into(),
)
})?,
})
.map_err(|_| {
BallotError::MessageParsing("Failed to serialize Msg2av".into())
})?,
routing: Message2avContent::get_routing(self.id),
})
.map_err(|_| BallotError::MessageParsing("Failed to serialize Msg2av".into()))?,
routing: Message2avContent::get_routing(self.id),
}).map_err(|_| BallotError::Communication("Failed to send Msg2av".into()))?;
.map_err(|_| BallotError::Communication("Failed to send Msg2av".into()))?;
}
PartyEvent::Launch2b => {
if self.status != PartyStatus::Passed2av {
return Err(BallotError::InvalidState("Cannot launch 2b, incorrect state".into()));
return Err(BallotError::InvalidState(
"Cannot launch 2b, incorrect state".into(),
));
}
self.msg_out_sender.send(MessageWire {
content_bytes: serde_json::to_vec(&Message2bContent { ballot: self.ballot })
.map_err(|_| BallotError::MessageParsing("Failed to serialize Msg2b".into()))?,
routing: Message2bContent::get_routing(self.id),
}).map_err(|_| BallotError::Communication("Failed to send Msg2b".into()))?;
self.msg_out_sender
.send(MessageWire {
content_bytes: serde_json::to_vec(&Message2bContent {
ballot: self.ballot,
})
.map_err(|_| {
BallotError::MessageParsing("Failed to serialize Msg2b".into())
})?,
routing: Message2bContent::get_routing(self.id),
})
.map_err(|_| BallotError::Communication("Failed to send Msg2b".into()))?;
}
PartyEvent::Finalize => {
if self.status != PartyStatus::Passed2b {
return Err(BallotError::InvalidState("Cannot finalize, incorrect state".into()));
return Err(BallotError::InvalidState(
"Cannot finalize, incorrect state".into(),
));
}
self.status = PartyStatus::Finished;
}
@@ -532,7 +610,9 @@ mod tests {
routing,
};

party.update_state(msg_wire.content_bytes, msg_wire.routing).unwrap();
party
.update_state(msg_wire.content_bytes, msg_wire.routing)
.unwrap();
assert_eq!(party.status, PartyStatus::Passed1a);
}
}