From c3589c3f8020bad4d4f3d2b963d7e56957744046 Mon Sep 17 00:00:00 2001 From: akildemir Date: Thu, 28 Sep 2023 20:23:15 +0300 Subject: [PATCH] handle not locally provided txs --- coordinator/src/tributary/scanner.rs | 5 + coordinator/tributary/src/block.rs | 17 ++- coordinator/tributary/src/blockchain.rs | 22 +++- coordinator/tributary/src/lib.rs | 4 + coordinator/tributary/src/provided.rs | 104 ++++++++++++------ coordinator/tributary/src/tendermint/mod.rs | 11 +- coordinator/tributary/src/tests/block.rs | 4 + coordinator/tributary/src/tests/blockchain.rs | 90 +++++++++------ 8 files changed, 183 insertions(+), 74 deletions(-) diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index 5d8f00168..f30682914 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -120,6 +120,11 @@ pub(crate) async fn handle_new_blocks< let mut last_block = db.last_block(genesis); while let Some(next) = tributary.block_after(&last_block) { let block = tributary.block(&next).unwrap(); + + if !tributary.provided_waiting_list_empty() { + return; + } + handle_block::<_, _, _, _, _, _, P>( db, key, diff --git a/coordinator/tributary/src/block.rs b/coordinator/tributary/src/block.rs index 0a7a2f259..c10377627 100644 --- a/coordinator/tributary/src/block.rs +++ b/coordinator/tributary/src/block.rs @@ -33,7 +33,10 @@ pub enum BlockError { /// An unsigned transaction which was already added to the chain was present again. #[error("an unsigned transaction which was already added to the chain was present again")] UnsignedAlreadyIncluded, - /// Transactions weren't ordered as expected (Provided, followed by Unsigned, folowed by Signed). + /// A provided transaction which was already added to the chain was present again. + #[error("an provided transaction which was already added to the chain was present again")] + ProvidedAlreadyIncluded, + /// Transactions weren't ordered as expected (Provided, followed by Unsigned, followed by Signed). #[error("transactions weren't ordered as expected (Provided, Unsigned, Signed)")] WrongTransactionOrder, /// The block had a provided transaction this validator has yet to be provided. @@ -175,6 +178,8 @@ impl Block { schema: N::SignatureScheme, commit: impl Fn(u32) -> Option>, unsigned_in_chain: impl Fn([u8; 32]) -> bool, + provided_in_chain: impl Fn([u8; 32]) -> bool, // TODO: merge this with unsigned_on_chain? + ignore_non_local_provided: bool, ) -> Result<(), BlockError> { #[derive(Clone, Copy, PartialEq, Eq, Debug)] enum Order { @@ -209,9 +214,17 @@ impl Block { let current_tx_order = match tx.kind() { TransactionKind::Provided(order) => { + if provided_in_chain(tx_hash) { + Err(BlockError::ProvidedAlreadyIncluded)?; + } + let Some(local) = locally_provided.get_mut(order).and_then(|deque| deque.pop_front()) else { - Err(BlockError::NonLocalProvided(txs.pop().unwrap()))? + if !ignore_non_local_provided { + Err(BlockError::NonLocalProvided(txs.pop().unwrap()))? + } else { + continue; + } }; // Since this was a provided TX, it must be an application TX let Transaction::Application(tx) = tx else { diff --git a/coordinator/tributary/src/blockchain.rs b/coordinator/tributary/src/blockchain.rs index d21928ec0..4f306ec3a 100644 --- a/coordinator/tributary/src/blockchain.rs +++ b/coordinator/tributary/src/blockchain.rs @@ -50,6 +50,9 @@ impl Blockchain { fn unsigned_included_key(genesis: &[u8], hash: &[u8; 32]) -> Vec { D::key(b"tributary_blockchain", b"unsigned_included", [genesis, hash].concat()) } + fn provided_included_key(genesis: &[u8], hash: &[u8; 32]) -> Vec { + D::key(b"tributary_blockchain", b"provided_included", [genesis, hash].concat()) + } fn next_nonce_key(&self, signer: &::G) -> Vec { D::key( b"tributary_blockchain", @@ -136,6 +139,12 @@ impl Blockchain { db.get(Self::block_after_key(&genesis, block)).map(|bytes| bytes.try_into().unwrap()) } + pub(crate) fn provided_waiting_list_empty(db: &D, genesis: [u8; 32]) -> bool { + let key = ProvidedTransactions::::waiting_list_key(genesis); + #[allow(clippy::unwrap_or_default)] + db.get(key).unwrap_or(vec![]).is_empty() + } + pub(crate) fn tip_from_db(db: &D, genesis: [u8; 32]) -> [u8; 32] { db.get(Self::tip_key(genesis)).map(|bytes| bytes.try_into().unwrap()).unwrap_or(genesis) } @@ -182,7 +191,7 @@ impl Blockchain { self.mempool.block(&self.next_nonces, unsigned_in_chain), ); // build_block should not return invalid blocks - self.verify_block::(&block, schema).unwrap(); + self.verify_block::(&block, schema, false).unwrap(); block } @@ -190,10 +199,13 @@ impl Blockchain { &self, block: &Block, schema: N::SignatureScheme, + ignore_non_local_provided: bool, ) -> Result<(), BlockError> { let db = self.db.as_ref().unwrap(); let unsigned_in_chain = |hash: [u8; 32]| db.get(Self::unsigned_included_key(&self.genesis, &hash)).is_some(); + let provided_in_chain = + |hash: [u8; 32]| db.get(Self::provided_included_key(&self.genesis, &hash)).is_some(); let commit = |block: u32| -> Option> { let commit = self.commit_by_block_number(block)?; // commit has to be valid if it is coming from our db @@ -207,6 +219,8 @@ impl Blockchain { schema, &commit, unsigned_in_chain, + provided_in_chain, + ignore_non_local_provided, ) } @@ -217,7 +231,7 @@ impl Blockchain { commit: Vec, schema: N::SignatureScheme, ) -> Result<(), BlockError> { - self.verify_block::(block, schema)?; + self.verify_block::(block, schema, true)?; log::info!( "adding block {} to tributary {} with {} TXs", @@ -249,7 +263,9 @@ impl Blockchain { for tx in &block.transactions { match tx.kind() { TransactionKind::Provided(order) => { - self.provided.complete(&mut txn, order, tx.hash()); + let hash = tx.hash(); + self.provided.complete(&mut txn, order, hash); + txn.put(Self::provided_included_key(&self.genesis, &hash), []); } TransactionKind::Unsigned => { let hash = tx.hash(); diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 3c5227b0c..a135ea6e8 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -374,6 +374,10 @@ impl TributaryReader { .map(|commit| Commit::::decode(&mut commit.as_ref()).unwrap().end_time) } + pub fn provided_waiting_list_empty(&self) -> bool { + Blockchain::::provided_waiting_list_empty(&self.0, self.1) + } + // This isn't static, yet can be read with only minor discrepancy risks pub fn tip(&self) -> [u8; 32] { Blockchain::::tip_from_db(&self.0, self.1) diff --git a/coordinator/tributary/src/provided.rs b/coordinator/tributary/src/provided.rs index 0bf284d4a..5b98fbc89 100644 --- a/coordinator/tributary/src/provided.rs +++ b/coordinator/tributary/src/provided.rs @@ -35,6 +35,10 @@ impl ProvidedTransactions { D::key(b"tributary_provided", b"current", self.genesis) } + pub(crate) fn waiting_list_key(genesis: [u8; 32]) -> Vec { + D::key(b"tributary_provided", b"waiting_list", genesis) + } + pub(crate) fn new(db: D, genesis: [u8; 32]) -> Self { let mut res = ProvidedTransactions { db, genesis, transactions: HashMap::new() }; @@ -71,25 +75,50 @@ impl ProvidedTransactions { } let tx_hash = tx.hash(); - let provided_key = self.transaction_key(&tx_hash); - if self.db.get(&provided_key).is_some() { - Err(ProvidedError::AlreadyProvided)?; - } - let current_provided_key = self.current_provided_key(); + // get waiting list + let waiting_list_key = Self::waiting_list_key(self.genesis); #[allow(clippy::unwrap_or_default)] - let mut currently_provided = self.db.get(¤t_provided_key).unwrap_or(vec![]); + let mut waiting_list = self.db.get(&waiting_list_key).unwrap_or(vec![]); + + // check whether this tx is a late provide + let exist = waiting_list.chunks_exact(32).position(|h| { + let hash: [u8; 32] = h.try_into().unwrap(); + hash == tx_hash + }); + if let Some(i) = exist { + // remove from the list since it is now arrived. + let i = i * 32; + assert_eq!(&waiting_list.drain(i .. (i + 32)).collect::>(), &tx_hash); + + let mut txn = self.db.txn(); + txn.put(waiting_list_key, waiting_list); + txn.commit(); + } else { + // add to mempool if not + + // check whether we already have the tx in pool + let provided_key = self.transaction_key(&tx_hash); + if self.db.get(&provided_key).is_some() { + Err(ProvidedError::AlreadyProvided)?; + } - let mut txn = self.db.txn(); - txn.put(provided_key, tx.serialize()); - currently_provided.extend(tx_hash); - txn.put(current_provided_key, currently_provided); - txn.commit(); + let current_provided_key = self.current_provided_key(); + #[allow(clippy::unwrap_or_default)] + let mut currently_provided = self.db.get(¤t_provided_key).unwrap_or(vec![]); - if self.transactions.get(order).is_none() { - self.transactions.insert(order, VecDeque::new()); + let mut txn = self.db.txn(); + txn.put(provided_key, tx.serialize()); + currently_provided.extend(tx_hash); + txn.put(current_provided_key, currently_provided); + txn.commit(); + + if self.transactions.get(order).is_none() { + self.transactions.insert(order, VecDeque::new()); + } + self.transactions.get_mut(order).unwrap().push_back(tx); } - self.transactions.get_mut(order).unwrap().push_back(tx); + Ok(()) } @@ -100,25 +129,38 @@ impl ProvidedTransactions { order: &'static str, tx: [u8; 32], ) { - assert_eq!(self.transactions.get_mut(order).unwrap().pop_front().unwrap().hash(), tx); - - let current_provided_key = self.current_provided_key(); - let mut currently_provided = txn.get(¤t_provided_key).unwrap(); - - // Find this TX's hash - let mut i = 0; - loop { - if currently_provided[i .. (i + 32)] == tx { - assert_eq!(¤tly_provided.drain(i .. (i + 32)).collect::>(), &tx); - break; + let txs = self.transactions.get_mut(order); + if txs.as_ref().is_none() || + (txs.as_ref().is_some() && !txs.as_ref().unwrap().iter().any(|t| t.hash() == tx)) + { + // we don't have this tx in our mempool, add it to waiting list. + let waiting_list_key = Self::waiting_list_key(self.genesis); + #[allow(clippy::unwrap_or_default)] + let mut waiting_list = self.db.get(&waiting_list_key).unwrap_or(vec![]); + + waiting_list.extend(tx); + txn.put(waiting_list_key, waiting_list); + } else { + assert_eq!(txs.unwrap().pop_front().unwrap().hash(), tx); + + let current_provided_key = self.current_provided_key(); + let mut currently_provided = txn.get(¤t_provided_key).unwrap(); + + // Find this TX's hash + let mut i = 0; + loop { + if currently_provided[i .. (i + 32)] == tx { + assert_eq!(¤tly_provided.drain(i .. (i + 32)).collect::>(), &tx); + break; + } + + i += 32; + if i >= currently_provided.len() { + panic!("couldn't find completed TX in currently provided"); + } } - i += 32; - if i >= currently_provided.len() { - panic!("couldn't find completed TX in currently provided"); - } + txn.put(current_provided_key, currently_provided); } - - txn.put(current_provided_key, currently_provided); } } diff --git a/coordinator/tributary/src/tendermint/mod.rs b/coordinator/tributary/src/tendermint/mod.rs index 02d3dd799..7a4451dea 100644 --- a/coordinator/tributary/src/tendermint/mod.rs +++ b/coordinator/tributary/src/tendermint/mod.rs @@ -344,12 +344,15 @@ impl Network for TendermintNetwork async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> { let block = Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?; - self.blockchain.read().await.verify_block::(&block, self.signature_scheme()).map_err( - |e| match e { + self + .blockchain + .read() + .await + .verify_block::(&block, self.signature_scheme(), false) + .map_err(|e| match e { BlockError::NonLocalProvided(_) => TendermintBlockError::Temporal, _ => TendermintBlockError::Fatal, - }, - ) + }) } async fn add_block( diff --git a/coordinator/tributary/src/tests/block.rs b/coordinator/tributary/src/tests/block.rs index 2bc4b8235..39b2ff7ef 100644 --- a/coordinator/tributary/src/tests/block.rs +++ b/coordinator/tributary/src/tests/block.rs @@ -91,6 +91,8 @@ fn empty_block() { validators, commit, unsigned_in_chain, + unsigned_in_chain, + false, ) .unwrap(); } @@ -123,6 +125,8 @@ fn duplicate_nonces() { validators.clone(), commit, unsigned_in_chain, + unsigned_in_chain, + false, ); if i == 1 { res.unwrap(); diff --git a/coordinator/tributary/src/tests/blockchain.rs b/coordinator/tributary/src/tests/blockchain.rs index 05357cb1d..c0fb677a6 100644 --- a/coordinator/tributary/src/tests/blockchain.rs +++ b/coordinator/tributary/src/tests/blockchain.rs @@ -48,7 +48,7 @@ fn block_addition() { assert_eq!(block.header.parent, genesis); assert_eq!(block.header.transactions, [0; 32]); - blockchain.verify_block::(&block, validators.clone()).unwrap(); + blockchain.verify_block::(&block, validators.clone(), false).unwrap(); assert!(blockchain.add_block::(&block, vec![], validators).is_ok()); assert_eq!(blockchain.tip(), block.hash()); assert_eq!(blockchain.block_number(), 1); @@ -71,14 +71,14 @@ fn invalid_block() { #[allow(clippy::redundant_clone)] // False positive let mut block = block.clone(); block.header.parent = Blake2s256::digest(block.header.parent).into(); - assert!(blockchain.verify_block::(&block, validators.clone()).is_err()); + assert!(blockchain.verify_block::(&block, validators.clone(), false).is_err()); } // Mutate tranactions merkle { let mut block = block; block.header.transactions = Blake2s256::digest(block.header.transactions).into(); - assert!(blockchain.verify_block::(&block, validators.clone()).is_err()); + assert!(blockchain.verify_block::(&block, validators.clone(), false).is_err()); } let key = Zeroizing::new(::F::random(&mut OsRng)); @@ -89,7 +89,7 @@ fn invalid_block() { // Manually create the block to bypass build_block's checks let block = Block::new(blockchain.tip(), vec![], vec![Transaction::Application(tx.clone())]); assert_eq!(block.header.transactions, merkle(&[tx.hash()])); - assert!(blockchain.verify_block::(&block, validators.clone()).is_err()); + assert!(blockchain.verify_block::(&block, validators.clone(), false).is_err()); } // Run the rest of the tests with them as a participant @@ -99,7 +99,7 @@ fn invalid_block() { { let block = Block::new(blockchain.tip(), vec![], vec![Transaction::Application(tx.clone())]); assert_eq!(block.header.transactions, merkle(&[tx.hash()])); - blockchain.verify_block::(&block, validators.clone()).unwrap(); + blockchain.verify_block::(&block, validators.clone(), false).unwrap(); } { @@ -112,11 +112,11 @@ fn invalid_block() { )); let mut block = blockchain.build_block::(validators.clone()); assert_eq!(block.header.transactions, merkle(&[tx.hash()])); - blockchain.verify_block::(&block, validators.clone()).unwrap(); + blockchain.verify_block::(&block, validators.clone(), false).unwrap(); // And verify mutating the transactions merkle now causes a failure block.header.transactions = merkle(&[]); - assert!(blockchain.verify_block::(&block, validators.clone()).is_err()); + assert!(blockchain.verify_block::(&block, validators.clone(), false).is_err()); } { @@ -124,7 +124,7 @@ fn invalid_block() { let tx = crate::tests::signed_transaction(&mut OsRng, genesis, &key, 5); // Manually create the block to bypass build_block's checks let block = Block::new(blockchain.tip(), vec![], vec![Transaction::Application(tx)]); - assert!(blockchain.verify_block::(&block, validators.clone()).is_err()); + assert!(blockchain.verify_block::(&block, validators.clone(), false).is_err()); } { @@ -136,14 +136,14 @@ fn invalid_block() { validators.clone() )); let mut block = blockchain.build_block::(validators.clone()); - blockchain.verify_block::(&block, validators.clone()).unwrap(); + blockchain.verify_block::(&block, validators.clone(), false).unwrap(); match &mut block.transactions[0] { Transaction::Application(tx) => { tx.1.signature.s += ::F::ONE; } _ => panic!("non-signed tx found"), } - assert!(blockchain.verify_block::(&block, validators.clone()).is_err()); + assert!(blockchain.verify_block::(&block, validators.clone(), false).is_err()); // Make sure this isn't because the merkle changed due to the transaction hash including the // signature (which it explicitly isn't allowed to anyways) @@ -191,7 +191,7 @@ fn signed_transaction() { ); // Verify and add the block - blockchain.verify_block::(&block, validators.clone()).unwrap(); + blockchain.verify_block::(&block, validators.clone(), false).unwrap(); assert!(blockchain.add_block::(&block, vec![], validators.clone()).is_ok()); assert_eq!(blockchain.tip(), block.hash()); }; @@ -215,42 +215,64 @@ fn signed_transaction() { fn provided_transaction() { let genesis = new_genesis(); let validators = Arc::new(Validators::new(genesis, vec![]).unwrap()); - let (_, mut blockchain) = new_blockchain::(genesis, &[]); + let (db, mut blockchain) = new_blockchain::(genesis, &[]); let tx = random_provided_transaction(&mut OsRng); // This should be providable - let mut db = MemDb::new(); - let mut txs = ProvidedTransactions::<_, ProvidedTransaction>::new(db.clone(), genesis); + let mut temp_db = MemDb::new(); + let mut txs = ProvidedTransactions::<_, ProvidedTransaction>::new(temp_db.clone(), genesis); txs.provide(tx.clone()).unwrap(); assert_eq!(txs.provide(tx.clone()), Err(ProvidedError::AlreadyProvided)); assert_eq!( - ProvidedTransactions::<_, ProvidedTransaction>::new(db.clone(), genesis).transactions, + ProvidedTransactions::<_, ProvidedTransaction>::new(temp_db.clone(), genesis).transactions, HashMap::from([("provided", VecDeque::from([tx.clone()]))]), ); - let mut txn = db.txn(); + let mut txn = temp_db.txn(); txs.complete(&mut txn, "provided", tx.hash()); txn.commit(); assert!(ProvidedTransactions::<_, ProvidedTransaction>::new(db.clone(), genesis) .transactions .is_empty()); - // Non-provided transactions should fail verification - let block = Block::new(blockchain.tip(), vec![tx.clone()], vec![]); - assert!(blockchain.verify_block::(&block, validators.clone()).is_err()); + // case we have the block's provided txs in our local as well + { + // Non-provided transactions should fail verification because we don't have them locally. + let block = Block::new(blockchain.tip(), vec![tx.clone()], vec![]); + assert!(blockchain.verify_block::(&block, validators.clone(), false).is_err()); + + // Provided transactions should pass verification + blockchain.provide_transaction(tx.clone()).unwrap(); + blockchain.verify_block::(&block, validators.clone(), false).unwrap(); - // Provided transactions should pass verification - blockchain.provide_transaction(tx.clone()).unwrap(); - blockchain.verify_block::(&block, validators.clone()).unwrap(); + // add_block should work for verified blocks + assert!(blockchain.add_block::(&block, vec![], validators.clone()).is_ok()); - // add_block should work for verified blocks - assert!(blockchain.add_block::(&block, vec![], validators.clone()).is_ok()); + let block = Block::new(blockchain.tip(), vec![tx.clone()], vec![]); - let block = Block::new(blockchain.tip(), vec![tx], vec![]); - // The provided transaction should no longer considered provided, causing this error - assert!(blockchain.verify_block::(&block, validators.clone()).is_err()); - // add_block should fail for unverified provided transactions if told to add them - assert!(blockchain.add_block::(&block, vec![], validators.clone()).is_err()); + // The provided transaction should no longer considered provided but added to chain, + // causing this error + assert_eq!( + blockchain.verify_block::(&block, validators.clone(), false), + Err(BlockError::ProvidedAlreadyIncluded) + ); + } + + // case we don't have the block's provided txs in our local + { + let tx = random_provided_transaction(&mut OsRng); + let block = Block::new(blockchain.tip(), vec![tx.clone()], vec![]); + // add_block DOES NOT fail for unverified provided transactions if told to add them, + // since now we can have them later. + assert!(blockchain.add_block::(&block, vec![], validators.clone()).is_ok()); + + // make sure waiting list is not empty now + assert!(!Blockchain::::provided_waiting_list_empty(&db, genesis)); + // we provide it now.. + blockchain.provide_transaction(tx.clone()).unwrap(); + // list has to be empty now + assert!(Blockchain::::provided_waiting_list_empty(&db, genesis)); + } } #[tokio::test] @@ -287,7 +309,7 @@ async fn tendermint_evidence_tx() { } // Verify and add the block - blockchain.verify_block::(&block, validators.clone()).unwrap(); + blockchain.verify_block::(&block, validators.clone(), false).unwrap(); assert!(blockchain.add_block::(&block, vec![], validators.clone()).is_ok()); assert_eq!(blockchain.tip(), block.hash()); }; @@ -424,7 +446,7 @@ async fn block_tx_ordering() { } // should be a valid block - blockchain.verify_block::(&block, validators.clone()).unwrap(); + blockchain.verify_block::(&block, validators.clone(), false).unwrap(); // Unsigned before Provided { @@ -433,7 +455,7 @@ async fn block_tx_ordering() { let unsigned = block.transactions.remove(128); block.transactions.insert(0, unsigned); assert_eq!( - blockchain.verify_block::(&block, validators.clone()).unwrap_err(), + blockchain.verify_block::(&block, validators.clone(), false).unwrap_err(), BlockError::WrongTransactionOrder ); } @@ -444,7 +466,7 @@ async fn block_tx_ordering() { let signed = block.transactions.remove(256); block.transactions.insert(0, signed); assert_eq!( - blockchain.verify_block::(&block, validators.clone()).unwrap_err(), + blockchain.verify_block::(&block, validators.clone(), false).unwrap_err(), BlockError::WrongTransactionOrder ); } @@ -454,7 +476,7 @@ async fn block_tx_ordering() { let mut block = block; block.transactions.swap(128, 256); assert_eq!( - blockchain.verify_block::(&block, validators.clone()).unwrap_err(), + blockchain.verify_block::(&block, validators.clone(), false).unwrap_err(), BlockError::WrongTransactionOrder ); }