Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Rebase onto develop, which reverted this PR, and re-apply this PR
Browse files Browse the repository at this point in the history
kayabaNerve committed Apr 28, 2024

Verified

This commit was signed with the committer’s verified signature.
kayabaNerve Luke Parker
1 parent f5b34e2 commit 72419d7
Showing 8 changed files with 128 additions and 528 deletions.
34 changes: 2 additions & 32 deletions coordinator/tributary/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::{marker::PhantomData, fmt::Debug};
use std::{sync::Arc, io, collections::VecDeque};
use std::{sync::Arc, io};

use async_trait::async_trait;

@@ -154,14 +154,6 @@ pub struct Tributary<D: Db, T: TransactionTrait, P: P2p> {
synced_block: Arc<RwLock<SyncedBlockSender<TendermintNetwork<D, T, P>>>>,
synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>,
messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>,

p2p_meta_task_handle: Arc<tokio::task::AbortHandle>,
}

impl<D: Db, T: TransactionTrait, P: P2p> Drop for Tributary<D, T, P> {
fn drop(&mut self) {
self.p2p_meta_task_handle.abort();
}
}

impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
@@ -193,28 +185,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
);
let blockchain = Arc::new(RwLock::new(blockchain));

let to_rebroadcast = Arc::new(RwLock::new(VecDeque::new()));
// Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the
// P2P layer
let p2p_meta_task_handle = Arc::new(
tokio::spawn({
let to_rebroadcast = to_rebroadcast.clone();
let p2p = p2p.clone();
async move {
loop {
let to_rebroadcast = to_rebroadcast.read().await.clone();
for msg in to_rebroadcast {
p2p.broadcast(genesis, msg).await;
}
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
}
}
})
.abort_handle(),
);

let network =
TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p };
let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p };

let TendermintHandle { synced_block, synced_block_result, messages, machine } =
TendermintMachine::new(
@@ -235,7 +206,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
synced_block: Arc::new(RwLock::new(synced_block)),
synced_block_result: Arc::new(RwLock::new(synced_block_result)),
messages: Arc::new(RwLock::new(messages)),
p2p_meta_task_handle,
})
}

32 changes: 2 additions & 30 deletions coordinator/tributary/src/tendermint/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use core::ops::Deref;
use std::{
sync::Arc,
collections::{VecDeque, HashMap},
};
use std::{sync::Arc, collections::HashMap};

use async_trait::async_trait;

@@ -270,8 +267,6 @@ pub struct TendermintNetwork<D: Db, T: TransactionTrait, P: P2p> {
pub(crate) validators: Arc<Validators>,
pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>,

pub(crate) to_rebroadcast: Arc<RwLock<VecDeque<Vec<u8>>>>,

pub(crate) p2p: P,
}

@@ -308,26 +303,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
to_broadcast.extend(msg.encode());

// Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second
// until the block it's trying to build is complete
// If the P2P layer drops a message before all nodes obtained access, or a node had an
// intermittent failure, this will ensure reconcilliation
// This is atrocious if there's no content-based deduplication protocol for messages actively
// being gossiped
// LibP2p, as used by Serai, is configured to content-based deduplicate
{
let mut to_rebroadcast_lock = self.to_rebroadcast.write().await;
to_rebroadcast_lock.push_back(to_broadcast.clone());
// We should have, ideally, 3 * validators messages within a round
// Therefore, this should keep the most recent 2-rounds
// TODO: This isn't perfect. Each participant should just rebroadcast their latest round of
// messages
while to_rebroadcast_lock.len() > (6 * self.validators.weights.len()) {
to_rebroadcast_lock.pop_front();
}
}

self.p2p.broadcast(self.genesis, to_broadcast).await
}

@@ -366,7 +341,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
}
}

async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> {
async fn validate(&self, block: &Self::Block) -> Result<(), TendermintBlockError> {
let block =
Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?;
self
@@ -428,9 +403,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
}
}

// Since we've added a valid block, clear to_rebroadcast
*self.to_rebroadcast.write().await = VecDeque::new();

Some(TendermintBlock(
self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(),
))
17 changes: 8 additions & 9 deletions coordinator/tributary/tendermint/src/block.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@ use std::{
collections::{HashSet, HashMap},
};

use parity_scale_codec::Encode;
use serai_db::{Get, DbTxn, Db};

use crate::{
@@ -20,7 +19,7 @@ pub(crate) struct BlockData<N: Network> {

pub(crate) number: BlockNumber,
pub(crate) validator_id: Option<N::ValidatorId>,
pub(crate) proposal: Option<N::Block>,
pub(crate) our_proposal: Option<N::Block>,

pub(crate) log: MessageLog<N>,
pub(crate) slashes: HashSet<N::ValidatorId>,
@@ -43,15 +42,15 @@ impl<N: Network> BlockData<N> {
weights: Arc<N::Weights>,
number: BlockNumber,
validator_id: Option<N::ValidatorId>,
proposal: Option<N::Block>,
our_proposal: Option<N::Block>,
) -> BlockData<N> {
BlockData {
db,
genesis,

number,
validator_id,
proposal,
our_proposal,

log: MessageLog::new(weights),
slashes: HashSet::new(),
@@ -108,17 +107,17 @@ impl<N: Network> BlockData<N> {
self.populate_end_time(round);
}

// 11-13
// L11-13
self.round = Some(RoundData::<N>::new(
round,
time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]),
));
self.end_time.insert(round, self.round().end_time());

// 14-21
// L14-21
if Some(proposer) == self.validator_id {
let (round, block) = self.valid.clone().unzip();
block.or_else(|| self.proposal.clone()).map(|block| Data::Proposal(round, block))
block.or_else(|| self.our_proposal.clone()).map(|block| Data::Proposal(round, block))
} else {
self.round_mut().set_timeout(Step::Propose);
None
@@ -198,8 +197,8 @@ impl<N: Network> BlockData<N> {
assert!(!new_round);
None?;
}
// Put this message to the DB
txn.put(&msg_key, res.encode());
// Put that we're sending this message to the DB
txn.put(&msg_key, []);

txn.commit();
}
2 changes: 1 addition & 1 deletion coordinator/tributary/tendermint/src/ext.rs
Original file line number Diff line number Diff line change
@@ -288,7 +288,7 @@ pub trait Network: Sized + Send + Sync {
async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent);

/// Validate a block.
async fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>;
async fn validate(&self, block: &Self::Block) -> Result<(), BlockError>;

/// Add a block, returning the proposal for the next one.
///
552 changes: 105 additions & 447 deletions coordinator/tributary/tendermint/src/lib.rs

Large diffs are not rendered by default.

15 changes: 7 additions & 8 deletions coordinator/tributary/tendermint/src/message_log.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ use std::{sync::Arc, collections::HashMap};

use parity_scale_codec::Encode;

use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence};
use crate::{ext::*, RoundNumber, Step, DataFor, SignedMessageFor, Evidence};

type RoundLog<N> = HashMap<<N as Network>::ValidatorId, HashMap<Step, SignedMessageFor<N>>>;
pub(crate) struct MessageLog<N: Network> {
@@ -16,7 +16,7 @@ impl<N: Network> MessageLog<N> {
}

// Returns true if it's a new message
pub(crate) fn log(&mut self, signed: SignedMessageFor<N>) -> Result<bool, TendermintError<N>> {
pub(crate) fn log(&mut self, signed: SignedMessageFor<N>) -> Result<bool, Evidence> {
let msg = &signed.msg;
// Clarity, and safety around default != new edge cases
let round = self.log.entry(msg.round).or_insert_with(HashMap::new);
@@ -30,10 +30,7 @@ impl<N: Network> MessageLog<N> {
target: "tendermint",
"Validator sent multiple messages for the same block + round + step"
);
Err(TendermintError::Malicious(
msg.sender,
Some(Evidence::ConflictingMessages(existing.encode(), signed.encode())),
))?;
Err(Evidence::ConflictingMessages(existing.encode(), signed.encode()))?;
}
return Ok(false);
}
@@ -47,7 +44,8 @@ impl<N: Network> MessageLog<N> {
pub(crate) fn message_instances(&self, round: RoundNumber, data: &DataFor<N>) -> (u64, u64) {
let mut participating = 0;
let mut weight = 0;
for (participant, msgs) in &self.log[&round] {
let Some(log) = self.log.get(&round) else { return (0, 0) };
for (participant, msgs) in log {
if let Some(msg) = msgs.get(&data.step()) {
let validator_weight = self.weights.weight(*participant);
participating += validator_weight;
@@ -73,7 +71,8 @@ impl<N: Network> MessageLog<N> {
// Check if a supermajority of nodes have participated on a specific step
pub(crate) fn has_participation(&self, round: RoundNumber, step: Step) -> bool {
let mut participating = 0;
for (participant, msgs) in &self.log[&round] {
let Some(log) = self.log.get(&round) else { return false };
for (participant, msgs) in log {
if msgs.get(&step).is_some() {
participating += self.weights.weight(*participant);
}
2 changes: 2 additions & 0 deletions coordinator/tributary/tendermint/src/round.rs
Original file line number Diff line number Diff line change
@@ -57,13 +57,15 @@ impl<N: Network> RoundData<N> {

// Poll all set timeouts, returning the Step whose timeout has just expired
pub(crate) async fn timeout_future(&self) -> Step {
/*
let now = Instant::now();
log::trace!(
target: "tendermint",
"getting timeout_future, from step {:?}, off timeouts: {:?}",
self.step,
self.timeouts.iter().map(|(k, v)| (k, v.duration_since(now))).collect::<HashMap<_, _>>()
);
*/

let timeout_future = |step| {
let timeout = self.timeouts.get(&step).copied();
2 changes: 1 addition & 1 deletion coordinator/tributary/tendermint/tests/ext.rs
Original file line number Diff line number Diff line change
@@ -145,7 +145,7 @@ impl Network for TestNetwork {
println!("Slash for {id} due to {event:?}");
}

async fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> {
async fn validate(&self, block: &TestBlock) -> Result<(), BlockError> {
block.valid
}

0 comments on commit 72419d7

Please sign in to comment.