diff --git a/coordinator/tributary/tendermint/src/lib.rs b/coordinator/tributary/tendermint/src/lib.rs index cd676b13a..9927473ca 100644 --- a/coordinator/tributary/tendermint/src/lib.rs +++ b/coordinator/tributary/tendermint/src/lib.rs @@ -3,7 +3,7 @@ use core::fmt::Debug; use std::{ sync::Arc, time::{SystemTime, Instant, Duration}, - collections::VecDeque, + collections::{VecDeque, HashMap}, }; use parity_scale_codec::{Encode, Decode}; @@ -245,6 +245,10 @@ pub struct TendermintMachine { synced_block_result_send: mpsc::UnboundedSender, block: BlockData, + // TODO: Move this into the Block struct + round_proposals: HashMap, N::Block)>, + // TODO: Move this into the Round struct + upons: Upons, } pub struct SyncedBlock { @@ -346,6 +350,9 @@ impl TendermintMachine { proposal, ); + // Reset the round proposals + self.round_proposals = HashMap::new(); + // Start the first round self.round(RoundNumber(0), Some(round_end)); } @@ -383,6 +390,410 @@ impl TendermintMachine { } } + fn proposal_for_round(&self, round: RoundNumber) -> Option<(Option, &N::Block)> { + self.round_proposals.get(&round).map(|(round, block)| (*round, block)) + } + + // L22-27 + fn upon_proposal_without_valid_round(&mut self) { + if self.block.round().step != Step::Propose { + return; + } + + // If we have the proposal message... + let Some((None, block)) = self.proposal_for_round(self.block.round().number) else { + return; + }; + + // There either needs to not be a locked value or it must be equivalent + #[allow(clippy::map_unwrap_or)] + if self + .block + .locked + .as_ref() + .map(|(_round, locked_block)| block.id() == *locked_block) + .unwrap_or(true) + { + self.broadcast(Data::Prevote(Some(block.id()))); + } else { + self.broadcast(Data::Prevote(None)); + } + } + + // L28-33 + fn upon_proposal_with_valid_round(&mut self) { + if self.block.round().step != Step::Propose { + return; + } + + // If we have the proposal message... + let Some((Some(proposal_valid_round), block)) = + self.proposal_for_round(self.block.round().number) + else { + return; + }; + + // Check we have the necessary prevotes + if !self.block.log.has_consensus(proposal_valid_round, &Data::Prevote(Some(block.id()))) { + return; + } + + // We don't check valid round < current round as the `message` function does + + // If locked is None, lockedRoundp is -1 and less than valid round + #[allow(clippy::map_unwrap_or)] + let locked_clause_1 = self + .block + .locked + .as_ref() + .map(|(locked_round, _block)| locked_round.0 <= proposal_valid_round.0) + .unwrap_or(true); + // The second clause is if the locked values are equivalent. If no value is locked, they aren't + #[allow(clippy::map_unwrap_or)] + let locked_clause_2 = self + .block + .locked + .as_ref() + .map(|(_round, locked_block)| block.id() == *locked_block) + .unwrap_or(false); + + if locked_clause_1 || locked_clause_2 { + self.broadcast(Data::Prevote(Some(block.id()))); + } else { + self.broadcast(Data::Prevote(None)); + } + } + + // L34-35 + fn upon_prevotes(&mut self) { + if self.upons.upon_prevotes || (self.block.round().step != Step::Prevote) { + return; + } + + if self.block.log.has_participation(self.block.round().number, Step::Prevote) { + self.block.round_mut().set_timeout(Step::Prevote); + self.upons.upon_prevotes = true; + } + } + + // L36-43 + async fn upon_successful_current_round_prevotes(&mut self) { + // Returning if `self.step == Step::Propose` is equivalent to guarding `step >= prevote` + if self.upons.upon_successful_current_round_prevotes || + (self.block.round().step == Step::Propose) + { + return; + } + + // If we have the proposal message... + let Some((_, block)) = self.proposal_for_round(self.block.round().number) else { + return; + }; + + // Check we have the necessary prevotes + if !self.block.log.has_consensus(self.block.round().number, &Data::Prevote(Some(block.id()))) { + return; + } + + let block = block.clone(); + self.upons.upon_successful_current_round_prevotes = true; + + if self.block.round().step == Step::Prevote { + self.block.locked = Some((self.block.round().number, block.id())); + let signature = self + .signer + .sign(&commit_msg( + self.block.end_time[&self.block.round().number].canonical(), + block.id().as_ref(), + )) + .await; + self.broadcast(Data::Precommit(Some((block.id(), signature)))); + } + self.block.valid = Some((self.block.round().number, block)); + } + + // L44-46 + fn upon_negative_current_round_prevotes(&mut self) { + if self.upons.upon_negative_current_round_prevotes || (self.block.round().step != Step::Prevote) + { + return; + } + + if self.block.log.has_consensus(self.block.round().number, &Data::Prevote(None)) { + self.broadcast(Data::Precommit(None)); + } + + self.upons.upon_negative_current_round_prevotes = true; + } + + // L47-48 + fn upon_precommits(&mut self) { + if self.upons.upon_precommits { + return; + } + + if self.block.log.has_participation(self.block.round().number, Step::Precommit) { + self.block.round_mut().set_timeout(Step::Precommit); + self.upons.upon_precommits = true; + } + } + + // L22-48 + async fn all_current_round_upons(&mut self) { + self.upon_proposal_without_valid_round(); + self.upon_proposal_with_valid_round(); + self.upon_prevotes(); + self.upon_successful_current_round_prevotes().await; + self.upon_negative_current_round_prevotes(); + self.upon_precommits(); + } + + // L49-54 + async fn upon_successful_precommits(&mut self, round: RoundNumber) -> bool { + // If we have the proposal message... + let Some((_, block)) = self.proposal_for_round(round) else { return false }; + + // Check we have the necessary precommits + // The precommit we check we have consensus upon uses a junk signature since message equality + // disregards the signature + if !self + .block + .log + .has_consensus(round, &Data::Precommit(Some((block.id(), self.signer.sign(&[]).await)))) + { + return false; + } + + // Get all participants in this commit + let mut validators = vec![]; + let mut sigs = vec![]; + // Get all precommits for this round + for (validator, msgs) in &self.block.log.log[&round] { + if let Some(signed) = msgs.get(&Step::Precommit) { + if let Data::Precommit(Some((id, sig))) = &signed.msg.data { + // If this precommit was for this block, include it + if *id == block.id() { + validators.push(*validator); + sigs.push(sig.clone()); + } + } + } + } + + // Form the commit itself + let commit_msg = commit_msg(self.block.end_time[&round].canonical(), block.id().as_ref()); + let commit = Commit { + end_time: self.block.end_time[&round].canonical(), + validators: validators.clone(), + signature: self.network.signature_scheme().aggregate(&validators, &commit_msg, &sigs), + }; + debug_assert!(self.network.verify_commit(block.id(), &commit)); + + // Add the block and reset the machine + log::info!( + target: "tendermint", + "TendermintMachine produced block {}", + hex::encode(block.id().as_ref()), + ); + let id = block.id(); + let proposal = self.network.add_block(block.clone(), commit).await; + log::trace!( + target: "tendermint", + "added block {} (produced by machine)", + hex::encode(id.as_ref()), + ); + self.reset(round, proposal).await; + + true + } + + // L49-54 + async fn all_any_round_upons(&mut self, round: RoundNumber) -> bool { + self.upon_successful_precommits(round).await + } + + // Returns Ok(true) if this was a Precommit which had either no signature or its signature + // validated + // Returns Ok(false) if it wasn't a Precommit or the signature wasn't validated yet + // Returns Err if the signature was invalid + async fn verify_precommit_signature( + &mut self, + signed: &SignedMessageFor, + ) -> Result { + let msg = &signed.msg; + if let Data::Precommit(precommit) = &msg.data { + let Some((id, sig)) = precommit else { return Ok(true) }; + // Also verify the end_time of the commit + // Only perform this verification if we already have the end_time + // Else, there's a DoS where we receive a precommit for some round infinitely in the future + // which forces us to calculate every end time + if let Some(end_time) = self.block.end_time.get(&msg.round) { + if !self.validators.verify(msg.sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) + { + log::warn!(target: "tendermint", "validator produced an invalid commit signature"); + self + .slash( + msg.sender, + SlashEvent::WithEvidence(Evidence::InvalidPrecommit(signed.encode())), + ) + .await; + Err(TendermintError::Malicious)?; + } + return Ok(true); + } + } + Ok(false) + } + + async fn message(&mut self, signed: &SignedMessageFor) -> Result<(), TendermintError> { + let msg = &signed.msg; + if msg.block != self.block.number { + Err(TendermintError::Temporal)?; + } + + // If this is a precommit, verify its signature + self.verify_precommit_signature(signed).await?; + + // Only let the proposer propose + if matches!(msg.data, Data::Proposal(..)) && + (msg.sender != self.weights.proposer(msg.block, msg.round)) + { + log::warn!(target: "tendermint", "validator who wasn't the proposer proposed"); + // TODO: This should have evidence + self + .slash(msg.sender, SlashEvent::Id(SlashReason::InvalidProposer, msg.block.0, msg.round.0)) + .await; + Err(TendermintError::Malicious)?; + }; + + // If this is a proposal, verify the block + // If the block is invalid, drop the message, letting the timeout cover it + // This prevents needing to check if valid inside every `upon` block + if let Data::Proposal(_, block) = &msg.data { + match self.network.validate(block).await { + Ok(()) => {} + Err(BlockError::Temporal) => return Err(TendermintError::Temporal), + Err(BlockError::Fatal) => { + log::warn!(target: "tendermint", "validator proposed a fatally invalid block"); + self + .slash( + msg.sender, + SlashEvent::Id(SlashReason::InvalidBlock, self.block.number.0, msg.round.0), + ) + .await; + Err(TendermintError::Malicious)?; + } + }; + } + + // If this is a proposal, verify the valid round isn't fundamentally invalid + if let Data::Proposal(Some(valid_round), _) = msg.data { + if valid_round.0 >= msg.round.0 { + log::warn!( + target: "tendermint", + "proposed proposed with a syntactically invalid valid round", + ); + self + .slash(msg.sender, SlashEvent::WithEvidence(Evidence::InvalidValidRound(msg.encode()))) + .await; + Err(TendermintError::Malicious)?; + } + } + + // Add it to the log, returning if it was already handled + match self.block.log.log(signed.clone()) { + Ok(true) => {} + Ok(false) => Err(TendermintError::AlreadyHandled)?, + Err(evidence) => { + self.slash(msg.sender, SlashEvent::WithEvidence(evidence)).await; + Err(TendermintError::Malicious)?; + } + } + log::debug!( + target: "tendermint", + "received new tendermint message (block: {}, round: {}, step: {:?})", + msg.block.0, + msg.round.0, + msg.data.step(), + ); + + // If this is a proposal, insert it + if let Data::Proposal(vr, block) = &msg.data { + self.round_proposals.insert(msg.round, (*vr, block.clone())); + } + + // L55-56 + // Jump ahead if we should + if (msg.round.0 > self.block.round().number.0) && + (self.block.log.round_participation(msg.round) >= self.weights.fault_threshold()) + { + log::debug!( + target: "tendermint", + "jumping from round {} to round {}", + self.block.round().number.0, + msg.round.0, + ); + + // Jump to the new round. + let old_round = self.block.round().number; + self.round(msg.round, None); + + // If any jumped over/to round already has precommit messages, verify their signatures + for jumped in (old_round.0 + 1) ..= msg.round.0 { + let jumped = RoundNumber(jumped); + let round_msgs = self.block.log.log.get(&jumped).cloned().unwrap_or_default(); + for (validator, msgs) in &round_msgs { + if let Some(existing) = msgs.get(&Step::Precommit) { + if let Ok(res) = self.verify_precommit_signature(existing).await { + // Ensure this actually verified the signature instead of believing it shouldn't yet + assert!(res); + } else { + // Remove the message so it isn't counted towards forming a commit/included in one + // This won't remove the fact they precommitted for this block hash in the MessageLog + // TODO: Don't even log these in the first place until we jump, preventing needing + // to do this in the first place + self + .block + .log + .log + .get_mut(&jumped) + .unwrap() + .get_mut(validator) + .unwrap() + .remove(&Step::Precommit) + .unwrap(); + } + } + } + } + } + + // Now that we've jumped, and: + // 1) If this is a message for an old round, verified the precommit signatures + // 2) If this is a message for what was the current round, verified the precommit signatures + // 3) If this is a message for what was a future round, verified the precommit signatures if it + // has 34+% participation + // Run all `upons` run for any round, which may produce a Commit if it has 67+% participation + // (returning true if it does, letting us return now) + // It's necessary to verify the precommit signatures before Commit production is allowed, hence + // this specific flow + if self.all_any_round_upons(msg.round).await { + return Ok(()); + } + + // If this is a historic round, or a future round without sufficient participation, return + if msg.round.0 != self.block.round().number.0 { + return Ok(()); + } + // msg.round is now guaranteed to be equal to self.block.round().number + debug_assert_eq!(msg.round, self.block.round().number); + + // Run all `upons` run for the current round + self.all_current_round_upons().await; + + Ok(()) + } + /// Create a new Tendermint machine, from the specified point, with the specified block as the /// one to propose next. This will return a channel to send messages from the gossip layer and /// the machine itself. The machine should have `run` called from an asynchronous task. @@ -450,6 +861,15 @@ impl TendermintMachine { validator_id, Some(proposal), ), + + round_proposals: HashMap::new(), + + upons: Upons { + upon_prevotes: false, + upon_successful_current_round_prevotes: false, + upon_negative_current_round_prevotes: false, + upon_precommits: false, + }, }; // The end time of the last block is the start time for this one diff --git a/coordinator/tributary/tendermint/src/message_log.rs b/coordinator/tributary/tendermint/src/message_log.rs index 3959852d8..a150617be 100644 --- a/coordinator/tributary/tendermint/src/message_log.rs +++ b/coordinator/tributary/tendermint/src/message_log.rs @@ -86,13 +86,4 @@ impl MessageLog { let (_, weight) = self.message_instances(round, data); weight >= self.weights.threshold() } - - pub(crate) fn get( - &self, - round: RoundNumber, - sender: N::ValidatorId, - step: Step, - ) -> Option<&SignedMessageFor> { - self.log.get(&round).and_then(|round| round.get(&sender).and_then(|msgs| msgs.get(&step))) - } }