diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index f30682914..240508525 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -12,6 +12,7 @@ use tributary::{ tx::{TendermintTx, decode_evidence}, TendermintNetwork, }, + TransactionKind, }; use serai_db::DbTxn; @@ -121,8 +122,18 @@ pub(crate) async fn handle_new_blocks< while let Some(next) = tributary.block_after(&last_block) { let block = tributary.block(&next).unwrap(); - if !tributary.provided_waiting_list_empty() { - return; + for tx in &block.transactions { + // since we know provided txs are the first in the block, we can assume that + // all of them were ok if we haven't returned yet and got a new kind, so we can + // break and continue to scan the block. + let TransactionKind::Provided(order) = tx.kind() else { + break; + }; + + // make sure we have all the provided txs in this block locally + if !tributary.provided_txs_ok_for_block(&block.hash(), order) { + return; + } } handle_block::<_, _, _, _, _, _, P>( diff --git a/coordinator/tributary/src/blockchain.rs b/coordinator/tributary/src/blockchain.rs index 4f306ec3a..a3d97429d 100644 --- a/coordinator/tributary/src/blockchain.rs +++ b/coordinator/tributary/src/blockchain.rs @@ -139,10 +139,32 @@ impl Blockchain { db.get(Self::block_after_key(&genesis, block)).map(|bytes| bytes.try_into().unwrap()) } - pub(crate) fn provided_waiting_list_empty(db: &D, genesis: [u8; 32]) -> bool { - let key = ProvidedTransactions::::waiting_list_key(genesis); + // TODO: no idea what to call this function + pub(crate) fn provided_txs_ok_for_block( + db: &D, + genesis: &[u8; 32], + block: &[u8; 32], + order: &str, + ) -> bool { + let local_key = ProvidedTransactions::::local_transaction_no_key(genesis, order); + let on_chain_key = ProvidedTransactions::::last_tx_block_order_key(genesis, block, order); #[allow(clippy::unwrap_or_default)] - db.get(key).unwrap_or(vec![]).is_empty() + let local = u32::from_le_bytes( + db.get(local_key) + .unwrap_or(u32::try_from(0).unwrap().to_le_bytes().to_vec()) + .try_into() + .unwrap(), + ); + let on_chain = u32::from_le_bytes( + db.get(on_chain_key) + .unwrap_or(u32::try_from(0).unwrap().to_le_bytes().to_vec()) + .try_into() + .unwrap(), + ); + + // TODO: clean up the old used keys? since we don't need block-order -> tx_no + // info once we get pass the block. + local >= on_chain } pub(crate) fn tip_from_db(db: &D, genesis: [u8; 32]) -> [u8; 32] { @@ -264,7 +286,7 @@ impl Blockchain { match tx.kind() { TransactionKind::Provided(order) => { let hash = tx.hash(); - self.provided.complete(&mut txn, order, hash); + self.provided.complete(&mut txn, order, self.tip, hash); txn.put(Self::provided_included_key(&self.genesis, &hash), []); } TransactionKind::Unsigned => { diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index a135ea6e8..623c3071f 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -374,8 +374,8 @@ impl TributaryReader { .map(|commit| Commit::::decode(&mut commit.as_ref()).unwrap().end_time) } - pub fn provided_waiting_list_empty(&self) -> bool { - Blockchain::::provided_waiting_list_empty(&self.0, self.1) + pub fn provided_txs_ok_for_block(&self, hash: &[u8; 32], order: &str) -> bool { + Blockchain::::provided_txs_ok_for_block(&self.0, &self.1, hash, order) } // This isn't static, yet can be read with only minor discrepancy risks diff --git a/coordinator/tributary/src/provided.rs b/coordinator/tributary/src/provided.rs index 5b98fbc89..aaf94e325 100644 --- a/coordinator/tributary/src/provided.rs +++ b/coordinator/tributary/src/provided.rs @@ -34,9 +34,18 @@ impl ProvidedTransactions { fn current_provided_key(&self) -> Vec { D::key(b"tributary_provided", b"current", self.genesis) } - - pub(crate) fn waiting_list_key(genesis: [u8; 32]) -> Vec { - D::key(b"tributary_provided", b"waiting_list", genesis) + pub(crate) fn local_transaction_no_key(genesis: &[u8; 32], order: &str) -> Vec { + D::key(b"tributary_provided", b"local", [genesis, order.as_bytes()].concat()) + } + pub(crate) fn on_chain_transaction_no_key(genesis: &[u8; 32], order: &str) -> Vec { + D::key(b"tributary_provided", b"on_chain", [genesis, order.as_bytes()].concat()) + } + pub(crate) fn last_tx_block_order_key( + genesis: &[u8; 32], + hash: &[u8; 32], + order: &str, + ) -> Vec { + D::key(b"tributary_provided", b"on_chain", [genesis, hash, order.as_bytes()].concat()) } pub(crate) fn new(db: D, genesis: [u8; 32]) -> Self { @@ -73,30 +82,31 @@ impl ProvidedTransactions { Ok(()) => {} Err(e) => Err(ProvidedError::InvalidProvided(e))?, } - let tx_hash = tx.hash(); - // get waiting list - let waiting_list_key = Self::waiting_list_key(self.genesis); + // get local and on-chain tx numbers + let local_key = Self::local_transaction_no_key(&self.genesis, order); + let on_chain_key = Self::on_chain_transaction_no_key(&self.genesis, order); #[allow(clippy::unwrap_or_default)] - let mut waiting_list = self.db.get(&waiting_list_key).unwrap_or(vec![]); - - // check whether this tx is a late provide - let exist = waiting_list.chunks_exact(32).position(|h| { - let hash: [u8; 32] = h.try_into().unwrap(); - hash == tx_hash - }); - if let Some(i) = exist { - // remove from the list since it is now arrived. - let i = i * 32; - assert_eq!(&waiting_list.drain(i .. (i + 32)).collect::>(), &tx_hash); - - let mut txn = self.db.txn(); - txn.put(waiting_list_key, waiting_list); - txn.commit(); - } else { - // add to mempool if not - + let on_chain_tx_no = u32::from_le_bytes( + self + .db + .get(on_chain_key) + .unwrap_or(u32::try_from(0).unwrap().to_le_bytes().to_vec()) + .try_into() + .unwrap(), + ); + let mut local_tx_no = u32::from_le_bytes( + self + .db + .get(&local_key) + .unwrap_or(u32::try_from(0).unwrap().to_le_bytes().to_vec()) + .try_into() + .unwrap(), + ); + + // try add to mempool if this is a new provided(we haven't seen it on-chain before). + if local_tx_no >= on_chain_tx_no { // check whether we already have the tx in pool let provided_key = self.transaction_key(&tx_hash); if self.db.get(&provided_key).is_some() { @@ -119,6 +129,12 @@ impl ProvidedTransactions { self.transactions.get_mut(order).unwrap().push_back(tx); } + // bump the tx number for the local order + local_tx_no += 1; + let mut txn = self.db.txn(); + txn.put(local_key, local_tx_no.to_le_bytes()); + txn.commit(); + Ok(()) } @@ -127,20 +143,11 @@ impl ProvidedTransactions { &mut self, txn: &mut D::Transaction<'_>, order: &'static str, + block: [u8; 32], tx: [u8; 32], ) { let txs = self.transactions.get_mut(order); - if txs.as_ref().is_none() || - (txs.as_ref().is_some() && !txs.as_ref().unwrap().iter().any(|t| t.hash() == tx)) - { - // we don't have this tx in our mempool, add it to waiting list. - let waiting_list_key = Self::waiting_list_key(self.genesis); - #[allow(clippy::unwrap_or_default)] - let mut waiting_list = self.db.get(&waiting_list_key).unwrap_or(vec![]); - - waiting_list.extend(tx); - txn.put(waiting_list_key, waiting_list); - } else { + if txs.is_some() && txs.as_ref().unwrap().iter().any(|t| t.hash() == tx) { assert_eq!(txs.unwrap().pop_front().unwrap().hash(), tx); let current_provided_key = self.current_provided_key(); @@ -162,5 +169,33 @@ impl ProvidedTransactions { txn.put(current_provided_key, currently_provided); } + + // bump the on-chain tx number. + let on_chain_key = Self::on_chain_transaction_no_key(&self.genesis, order); + let block_order_key = Self::last_tx_block_order_key(&self.genesis, &block, order); + let mut on_chain_tx_no = u32::from_le_bytes( + self + .db + .get(&on_chain_key) + .unwrap_or(u32::try_from(0).unwrap().to_le_bytes().to_vec()) + .try_into() + .unwrap(), + ); + + // TODO: use block hash or block number block-order key? + // - Block hash is easy to use for keys and doesn't require additional api + // but it takes up too much space. + // - Block numbers are not that suitable as keys and requires additional tributary scanner + // api(block_hash -> block_no) but doesn't take much space. + + // TODO: do we need both a global save and block-order save? + // Technically it should be enough to save the tx for block-order only, but that requires + // api change in `provide` function to take block hash as well(last block of the chain), + // we should be able pass the last block from where we call it, but we do we want that? + + // save it + on_chain_tx_no += 1; + txn.put(on_chain_key, on_chain_tx_no.to_le_bytes()); + txn.put(block_order_key, on_chain_tx_no.to_le_bytes()); } } diff --git a/coordinator/tributary/src/tests/blockchain.rs b/coordinator/tributary/src/tests/blockchain.rs index c0fb677a6..e9227e28c 100644 --- a/coordinator/tributary/src/tests/blockchain.rs +++ b/coordinator/tributary/src/tests/blockchain.rs @@ -217,7 +217,7 @@ fn provided_transaction() { let validators = Arc::new(Validators::new(genesis, vec![]).unwrap()); let (db, mut blockchain) = new_blockchain::(genesis, &[]); - let tx = random_provided_transaction(&mut OsRng); + let tx = random_provided_transaction(&mut OsRng, "order1"); // This should be providable let mut temp_db = MemDb::new(); @@ -226,10 +226,10 @@ fn provided_transaction() { assert_eq!(txs.provide(tx.clone()), Err(ProvidedError::AlreadyProvided)); assert_eq!( ProvidedTransactions::<_, ProvidedTransaction>::new(temp_db.clone(), genesis).transactions, - HashMap::from([("provided", VecDeque::from([tx.clone()]))]), + HashMap::from([("order1", VecDeque::from([tx.clone()]))]), ); let mut txn = temp_db.txn(); - txs.complete(&mut txn, "provided", tx.hash()); + txs.complete(&mut txn, "order1", [0u8; 32], tx.hash()); txn.commit(); assert!(ProvidedTransactions::<_, ProvidedTransaction>::new(db.clone(), genesis) .transactions @@ -260,18 +260,88 @@ fn provided_transaction() { // case we don't have the block's provided txs in our local { - let tx = random_provided_transaction(&mut OsRng); - let block = Block::new(blockchain.tip(), vec![tx.clone()], vec![]); + let tx1 = random_provided_transaction(&mut OsRng, "order1"); + let tx2 = random_provided_transaction(&mut OsRng, "order1"); + let tx3 = random_provided_transaction(&mut OsRng, "order2"); + let tx4 = random_provided_transaction(&mut OsRng, "order2"); + // add_block DOES NOT fail for unverified provided transactions if told to add them, // since now we can have them later. - assert!(blockchain.add_block::(&block, vec![], validators.clone()).is_ok()); + let block1 = Block::new(blockchain.tip(), vec![tx1.clone(), tx3.clone()], vec![]); + assert!(blockchain.add_block::(&block1, vec![], validators.clone()).is_ok()); + + // in fact, we can have many blocks that have provided txs that we don't have locally. + let block2 = Block::new(blockchain.tip(), vec![tx2.clone(), tx4.clone()], vec![]); + assert!(blockchain.add_block::(&block2, vec![], validators.clone()).is_ok()); + + // make sure we won't return ok for the block before we actually got the txs + let TransactionKind::Provided(order) = tx1.kind() else { panic!("tx wasn't provided") }; + assert!(!Blockchain::::provided_txs_ok_for_block( + &db, + &genesis, + &block1.hash(), + order + )); + // provide the first tx + blockchain.provide_transaction(tx1).unwrap(); + // it should be ok for this order now, since the second tx has different order. + assert!(Blockchain::::provided_txs_ok_for_block( + &db, + &genesis, + &block1.hash(), + order + )); - // make sure waiting list is not empty now - assert!(!Blockchain::::provided_waiting_list_empty(&db, genesis)); - // we provide it now.. - blockchain.provide_transaction(tx.clone()).unwrap(); - // list has to be empty now - assert!(Blockchain::::provided_waiting_list_empty(&db, genesis)); + // give the second tx + let TransactionKind::Provided(order) = tx3.kind() else { panic!("tx wasn't provided") }; + assert!(!Blockchain::::provided_txs_ok_for_block( + &db, + &genesis, + &block1.hash(), + order + )); + blockchain.provide_transaction(tx3).unwrap(); + // it should be ok now for the first block + assert!(Blockchain::::provided_txs_ok_for_block( + &db, + &genesis, + &block1.hash(), + order + )); + + // provide the second block txs + let TransactionKind::Provided(order) = tx4.kind() else { panic!("tx wasn't provided") }; + // not ok yet + assert!(!Blockchain::::provided_txs_ok_for_block( + &db, + &genesis, + &block2.hash(), + order + )); + blockchain.provide_transaction(tx4).unwrap(); + // ok now + assert!(Blockchain::::provided_txs_ok_for_block( + &db, + &genesis, + &block2.hash(), + order + )); + + // provide the second block txs + let TransactionKind::Provided(order) = tx2.kind() else { panic!("tx wasn't provided") }; + assert!(!Blockchain::::provided_txs_ok_for_block( + &db, + &genesis, + &block2.hash(), + order + )); + blockchain.provide_transaction(tx2).unwrap(); + assert!(Blockchain::::provided_txs_ok_for_block( + &db, + &genesis, + &block2.hash(), + order + )); } } @@ -418,7 +488,8 @@ async fn block_tx_ordering() { assert!(blockchain.add_transaction::(true, unsigned_tx.clone(), validators.clone())); mempool.push(unsigned_tx); - let provided_tx = SignedTx::Provided(Box::new(random_provided_transaction(&mut OsRng))); + let provided_tx = + SignedTx::Provided(Box::new(random_provided_transaction(&mut OsRng, "order1"))); blockchain.provide_transaction(provided_tx.clone()).unwrap(); provided_txs.push(provided_tx); } diff --git a/coordinator/tributary/src/tests/transaction/mod.rs b/coordinator/tributary/src/tests/transaction/mod.rs index a7d3abcbe..266a11428 100644 --- a/coordinator/tributary/src/tests/transaction/mod.rs +++ b/coordinator/tributary/src/tests/transaction/mod.rs @@ -62,7 +62,11 @@ impl ReadWrite for ProvidedTransaction { impl Transaction for ProvidedTransaction { fn kind(&self) -> TransactionKind<'_> { - TransactionKind::Provided("provided") + match self.0[0] { + 1 => TransactionKind::Provided("order1"), + 2 => TransactionKind::Provided("order2"), + _ => panic!("unknown order"), + } } fn hash(&self) -> [u8; 32] { @@ -74,9 +78,17 @@ impl Transaction for ProvidedTransaction { } } -pub fn random_provided_transaction(rng: &mut R) -> ProvidedTransaction { +pub fn random_provided_transaction( + rng: &mut R, + order: &str, +) -> ProvidedTransaction { let mut data = vec![0; 512]; rng.fill_bytes(&mut data); + data[0] = match order { + "order1" => 1, + "order2" => 2, + _ => panic!("unknown order"), + }; ProvidedTransaction(data) }