diff --git a/Cargo.lock b/Cargo.lock index 692bdf2fa..6622efabd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9105,6 +9105,7 @@ dependencies = [ "hex", "log", "parity-scale-codec", + "serai-db", "thiserror", "tokio", ] diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 5a5df1a7a..7f174d728 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -218,7 +218,7 @@ impl Tributary { TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p }; let TendermintHandle { synced_block, synced_block_result, messages, machine } = - TendermintMachine::new(network.clone(), block_number, start_time, proposal).await; + TendermintMachine::new(db.clone(), network.clone(), block_number, start_time, proposal).await; tokio::spawn(machine.run()); Some(Self { diff --git a/coordinator/tributary/src/tendermint/mod.rs b/coordinator/tributary/src/tendermint/mod.rs index dc62c798a..d362364c8 100644 --- a/coordinator/tributary/src/tendermint/mod.rs +++ b/coordinator/tributary/src/tendermint/mod.rs @@ -302,6 +302,8 @@ fn assert_target_block_time() { #[async_trait] impl Network for TendermintNetwork { + type Db = D; + type ValidatorId = [u8; 32]; type SignatureScheme = Arc; type Weights = Arc; diff --git a/coordinator/tributary/tendermint/Cargo.toml b/coordinator/tributary/tendermint/Cargo.toml index ba640391e..5a2905904 100644 --- a/coordinator/tributary/tendermint/Cargo.toml +++ b/coordinator/tributary/tendermint/Cargo.toml @@ -27,5 +27,7 @@ futures-util = { version = "0.3", default-features = false, features = ["std", " futures-channel = { version = "0.3", default-features = false, features = ["std", "sink"] } tokio = { version = "1", default-features = false, features = ["time"] } +serai-db = { path = "../../../common/db", version = "0.1", default-features = false } + [dev-dependencies] tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } diff --git a/coordinator/tributary/tendermint/src/block.rs b/coordinator/tributary/tendermint/src/block.rs index 8136f888d..5b2e86eaf 100644 --- a/coordinator/tributary/tendermint/src/block.rs +++ b/coordinator/tributary/tendermint/src/block.rs @@ -3,6 +3,9 @@ use std::{ collections::{HashSet, HashMap}, }; +use parity_scale_codec::Encode; +use serai_db::{Get, DbTxn, Db}; + use crate::{ time::CanonicalInstant, ext::{RoundNumber, BlockNumber, Block, Network}, @@ -12,6 +15,8 @@ use crate::{ }; pub(crate) struct BlockData { + db: N::Db, + pub(crate) number: BlockNumber, pub(crate) validator_id: Option, pub(crate) proposal: Option, @@ -32,12 +37,15 @@ pub(crate) struct BlockData { impl BlockData { pub(crate) fn new( + db: N::Db, weights: Arc, number: BlockNumber, validator_id: Option, proposal: Option, ) -> BlockData { BlockData { + db, + number, validator_id, proposal, @@ -128,12 +136,34 @@ impl BlockData { // 27, 33, 41, 46, 60, 64 self.round_mut().step = data.step(); - // Only return a message to if we're actually a current validator - self.validator_id.map(|validator_id| Message { + // Only return a message to if we're actually a current validator and haven't prior posted a + // message + let round_number = self.round().number; + let step = data.step(); + let res = self.validator_id.map(|validator_id| Message { sender: validator_id, block: self.number, - round: self.round().number, + round: round_number, data, - }) + }); + + if res.is_some() { + let mut txn = self.db.txn(); + let key = [ + b"tendermint-machine_already_sent_message".as_ref(), + &self.number.0.to_le_bytes(), + &round_number.0.to_le_bytes(), + &step.encode(), + ] + .concat(); + // If we've already sent a message, return + if txn.get(&key).is_some() { + None?; + } + txn.put(&key, []); + txn.commit(); + } + + res } } diff --git a/coordinator/tributary/tendermint/src/ext.rs b/coordinator/tributary/tendermint/src/ext.rs index 3d13a3b3e..b3d568a23 100644 --- a/coordinator/tributary/tendermint/src/ext.rs +++ b/coordinator/tributary/tendermint/src/ext.rs @@ -212,6 +212,9 @@ pub trait Block: Send + Sync + Clone + PartialEq + Eq + Debug + Encode + Decode /// Trait representing the distributed system Tendermint is providing consensus over. #[async_trait] pub trait Network: Sized + Send + Sync { + /// The database used to back this. + type Db: serai_db::Db; + // Type used to identify validators. type ValidatorId: ValidatorId; /// Signature scheme used by validators. diff --git a/coordinator/tributary/tendermint/src/lib.rs b/coordinator/tributary/tendermint/src/lib.rs index 163db6fc7..8faf67989 100644 --- a/coordinator/tributary/tendermint/src/lib.rs +++ b/coordinator/tributary/tendermint/src/lib.rs @@ -231,6 +231,8 @@ pub enum SlashEvent { /// A machine executing the Tendermint protocol. pub struct TendermintMachine { + db: N::Db, + network: N, signer: ::Signer, validators: N::SignatureScheme, @@ -322,6 +324,7 @@ impl TendermintMachine { // Create the new block self.block = BlockData::new( + self.db.clone(), self.weights.clone(), BlockNumber(self.block.number.0 + 1), self.signer.validator_id().await, @@ -370,6 +373,7 @@ impl TendermintMachine { /// the machine itself. The machine should have `run` called from an asynchronous task. #[allow(clippy::new_ret_no_self)] pub async fn new( + db: N::Db, network: N, last_block: BlockNumber, last_time: u64, @@ -409,6 +413,8 @@ impl TendermintMachine { let validator_id = signer.validator_id().await; // 01-10 let mut machine = TendermintMachine { + db: db.clone(), + network, signer, validators, @@ -420,6 +426,7 @@ impl TendermintMachine { synced_block_result_send, block: BlockData::new( + db, weights, BlockNumber(last_block.0 + 1), validator_id, diff --git a/coordinator/tributary/tendermint/tests/ext.rs b/coordinator/tributary/tendermint/tests/ext.rs index e3df7e489..f919b003e 100644 --- a/coordinator/tributary/tendermint/tests/ext.rs +++ b/coordinator/tributary/tendermint/tests/ext.rs @@ -10,6 +10,8 @@ use parity_scale_codec::{Encode, Decode}; use futures_util::sink::SinkExt; use tokio::{sync::RwLock, time::sleep}; +use serai_db::MemDb; + use tendermint_machine::{ ext::*, SignedMessageFor, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender, SlashEvent, TendermintMachine, TendermintHandle, @@ -111,6 +113,8 @@ struct TestNetwork( #[async_trait] impl Network for TestNetwork { + type Db = MemDb; + type ValidatorId = TestValidatorId; type SignatureScheme = TestSignatureScheme; type Weights = TestWeights; @@ -170,6 +174,7 @@ impl TestNetwork { let i = u16::try_from(i).unwrap(); let TendermintHandle { messages, synced_block, synced_block_result, machine } = TendermintMachine::new( + MemDb::new(), TestNetwork(i, arc.clone()), BlockNumber(1), start_time,