Skip to content

Commit

Permalink
Have Tributary's add_transaction return a proper error
Browse files Browse the repository at this point in the history
Modifies main.rs to properly handle the returned error.
  • Loading branch information
kayabaNerve committed Oct 15, 2023
1 parent 584943d commit 19e90b2
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 133 deletions.
22 changes: 15 additions & 7 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use tokio::{
time::sleep,
};

use ::tributary::{ProvidedError, TransactionKind, TransactionTrait, Block, Tributary};
use ::tributary::{
ProvidedError, TransactionKind, TransactionError, TransactionTrait, Block, Tributary,
};

mod tributary;
use crate::tributary::{
Expand Down Expand Up @@ -150,10 +152,16 @@ async fn publish_signed_transaction<D: Db, P: P2p>(
.await
.expect("we don't have a nonce, meaning we aren't a participant on this tributary"),
) {
// TODO: Assert if we didn't create a valid transaction
// We need to return a proper error here to enable that, due to a race condition around
// multiple publications
tributary.add_transaction(tx).await;
match tributary.add_transaction(tx.clone()).await {
Ok(_) => {}
// Some asynchonicity if InvalidNonce, assumed safe to deterministic nonces
Err(TransactionError::InvalidNonce) => {
log::warn!("publishing TX {tx:?} returned InvalidNonce. was it already added?")
}
Err(e) => panic!("created an invalid transaction: {e:?}"),
}
}
}

Expand Down Expand Up @@ -630,10 +638,10 @@ async fn handle_processor_message<D: Db, P: P2p>(
}
TransactionKind::Unsigned => {
log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash()));
// Ignores the result since we can't differentiate already in-mempool from
// already on-chain from invalid
// TODO: Don't ignore the result
tributary.add_transaction(tx).await;
match tributary.add_transaction(tx.clone()).await {
Ok(_) => {}
Err(e) => panic!("created an invalid unsigned transaction: {e:?}"),
}
}
TransactionKind::Signed(_) => {
log::trace!("getting next nonce for Tributary TX in response to processor message");
Expand Down
10 changes: 5 additions & 5 deletions coordinator/src/tests/tributary/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn dkg_test() {

// Publish all commitments but one
for (i, tx) in txs.iter().enumerate().skip(1) {
assert!(tributaries[i].1.add_transaction(tx.clone()).await);
assert_eq!(tributaries[i].1.add_transaction(tx.clone()).await, Ok(true));
}

// Wait until these are included
Expand Down Expand Up @@ -104,7 +104,7 @@ async fn dkg_test() {

// Publish the last commitment
let block_before_tx = tributaries[0].1.tip().await;
assert!(tributaries[0].1.add_transaction(txs[0].clone()).await);
assert_eq!(tributaries[0].1.add_transaction(txs[0].clone()).await, Ok(true));
wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, txs[0].hash()).await;
sleep(Duration::from_secs(Tributary::<MemDb, Transaction, LocalP2p>::block_time().into())).await;

Expand Down Expand Up @@ -181,7 +181,7 @@ async fn dkg_test() {

let block_before_tx = tributaries[0].1.tip().await;
for (i, tx) in txs.iter().enumerate().skip(1) {
assert!(tributaries[i].1.add_transaction(tx.clone()).await);
assert_eq!(tributaries[i].1.add_transaction(tx.clone()).await, Ok(true));
}
for tx in txs.iter().skip(1) {
wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, tx.hash()).await;
Expand All @@ -205,7 +205,7 @@ async fn dkg_test() {

// Publish the final set of shares
let block_before_tx = tributaries[0].1.tip().await;
assert!(tributaries[0].1.add_transaction(txs[0].clone()).await);
assert_eq!(tributaries[0].1.add_transaction(txs[0].clone()).await, Ok(true));
wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, txs[0].hash()).await;
sleep(Duration::from_secs(Tributary::<MemDb, Transaction, LocalP2p>::block_time().into())).await;

Expand Down Expand Up @@ -296,7 +296,7 @@ async fn dkg_test() {
}
let block_before_tx = tributaries[0].1.tip().await;
for (i, tx) in txs.iter().enumerate() {
assert!(tributaries[i].1.add_transaction(tx.clone()).await);
assert_eq!(tributaries[i].1.add_transaction(tx.clone()).await, Ok(true));
}
for tx in txs.iter() {
wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, tx.hash()).await;
Expand Down
2 changes: 1 addition & 1 deletion coordinator/src/tests/tributary/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn tx_test() {
Transaction::DkgCommitments(attempt, commitments.clone(), Transaction::empty_signed());
tx.sign(&mut OsRng, spec.genesis(), &key, 0);

assert!(tributaries[sender].1.add_transaction(tx.clone()).await);
assert_eq!(tributaries[sender].1.add_transaction(tx.clone()).await, Ok(true));
let included_in = wait_for_tx_inclusion(&tributaries[sender].1, block_before_tx, tx.hash()).await;
// Also sleep for the block time to ensure the block is synced around before we run checks on it
sleep(Duration::from_secs(Tributary::<MemDb, Transaction, LocalP2p>::block_time().into())).await;
Expand Down
4 changes: 2 additions & 2 deletions coordinator/tributary/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tendermint::ext::{Network, Commit};

use crate::{
ReadWrite, ProvidedError, ProvidedTransactions, BlockError, Block, Mempool, Transaction,
transaction::{Signed, TransactionKind, Transaction as TransactionTrait},
transaction::{Signed, TransactionKind, TransactionError, Transaction as TransactionTrait},
};

#[derive(Debug)]
Expand Down Expand Up @@ -165,7 +165,7 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
internal: bool,
tx: Transaction<T>,
schema: N::SignatureScheme,
) -> bool {
) -> Result<bool, TransactionError> {
let db = self.db.as_ref().unwrap();
let genesis = self.genesis;

Expand Down
10 changes: 5 additions & 5 deletions coordinator/tributary/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,10 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
self.network.blockchain.read().await.next_nonce(signer)
}

// Returns if the transaction was new and valid.
// Returns Ok(true) if new, Ok(false) if an already present unsigned, or the error.
// Safe to be &self since the only meaningful usage of self is self.network.blockchain which
// successfully acquires its own write lock
pub async fn add_transaction(&self, tx: T) -> bool {
pub async fn add_transaction(&self, tx: T) -> Result<bool, TransactionError> {
let tx = Transaction::Application(tx);
let mut to_broadcast = vec![TRANSACTION_MESSAGE];
tx.write(&mut to_broadcast).unwrap();
Expand All @@ -268,7 +268,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
tx,
self.network.signature_scheme(),
);
if res {
if res == Ok(true) {
self.network.p2p.broadcast(self.genesis, to_broadcast).await;
}
res
Expand Down Expand Up @@ -339,8 +339,8 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
tx,
self.network.signature_scheme(),
);
log::debug!("received transaction message. valid new transaction: {res}");
res
log::debug!("received transaction message. valid new transaction: {res:?}");
res == Ok(true)
}

Some(&TENDERMINT_MESSAGE) => {
Expand Down
32 changes: 14 additions & 18 deletions coordinator/tributary/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use tendermint::ext::{Network, Commit};

use crate::{
ACCOUNT_MEMPOOL_LIMIT, ReadWrite,
transaction::{Signed, TransactionKind, Transaction as TransactionTrait, verify_transaction},
transaction::{
Signed, TransactionKind, TransactionError, Transaction as TransactionTrait, verify_transaction,
},
tendermint::tx::verify_tendermint_tx,
Transaction,
};
Expand Down Expand Up @@ -92,7 +94,7 @@ impl<D: Db, T: TransactionTrait> Mempool<D, T> {
res
}

/// Returns true if this is a valid, new transaction.
// Returns Ok(true) if new, Ok(false) if an already present unsigned, or the error.
pub(crate) fn add<N: Network>(
&mut self,
blockchain_next_nonces: &HashMap<<Ristretto as Ciphersuite>::G, u32>,
Expand All @@ -101,29 +103,27 @@ impl<D: Db, T: TransactionTrait> Mempool<D, T> {
schema: N::SignatureScheme,
unsigned_in_chain: impl Fn([u8; 32]) -> bool,
commit: impl Fn(u32) -> Option<Commit<N::SignatureScheme>>,
) -> bool {
) -> Result<bool, TransactionError> {
match &tx {
Transaction::Tendermint(tendermint_tx) => {
// All Tendermint transactions should be unsigned
assert_eq!(TransactionKind::Unsigned, tendermint_tx.kind());

// check we have the tx in the pool/chain
if self.unsigned_already_exist(tx.hash(), unsigned_in_chain) {
return false;
return Ok(false);
}

// verify the tx
if verify_tendermint_tx::<N>(tendermint_tx, schema, commit).is_err() {
return false;
}
verify_tendermint_tx::<N>(tendermint_tx, schema, commit)?;
}
Transaction::Application(app_tx) => {
match app_tx.kind() {
TransactionKind::Signed(Signed { signer, nonce, .. }) => {
// Get the nonce from the blockchain
let Some(blockchain_next_nonce) = blockchain_next_nonces.get(signer).cloned() else {
// Not a participant
return false;
Err(TransactionError::InvalidSigner)?
};

// If the blockchain's nonce is greater than the mempool's, use it
Expand All @@ -140,32 +140,28 @@ impl<D: Db, T: TransactionTrait> Mempool<D, T> {
// If we have too many transactions from this sender, don't add this yet UNLESS we are
// this sender
if !internal && (nonce >= &(blockchain_next_nonce + ACCOUNT_MEMPOOL_LIMIT)) {
return false;
Err(TransactionError::TooManyInMempool)?;
}

if verify_transaction(app_tx, self.genesis, &mut self.next_nonces).is_err() {
return false;
}
verify_transaction(app_tx, self.genesis, &mut self.next_nonces)?;
debug_assert_eq!(self.next_nonces[signer], nonce + 1);
}
TransactionKind::Unsigned => {
// check we have the tx in the pool/chain
if self.unsigned_already_exist(tx.hash(), unsigned_in_chain) {
return false;
return Ok(false);
}

if app_tx.verify().is_err() {
return false;
}
app_tx.verify()?;
}
TransactionKind::Provided(_) => return false,
TransactionKind::Provided(_) => Err(TransactionError::ProvidedAddedToMempool)?,
}
}
}

// Save the TX to the pool
self.save_tx(tx);
true
Ok(true)
}

// Returns None if the mempool doesn't have a nonce tracked.
Expand Down
3 changes: 2 additions & 1 deletion coordinator/tributary/src/tendermint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
true,
Transaction::Tendermint(tx),
self.signature_scheme(),
) {
) == Ok(true)
{
self.p2p.broadcast(signer.genesis, to_broadcast).await;
}
}
Expand Down
36 changes: 14 additions & 22 deletions coordinator/tributary/src/tests/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,9 @@ fn invalid_block() {
{
// Add a valid transaction
let (_, mut blockchain) = new_blockchain(genesis, &[tx.1.signer]);
assert!(blockchain.add_transaction::<N>(
true,
Transaction::Application(tx.clone()),
validators.clone()
));
blockchain
.add_transaction::<N>(true, Transaction::Application(tx.clone()), validators.clone())
.unwrap();
let mut block = blockchain.build_block::<N>(validators.clone());
assert_eq!(block.header.transactions, merkle(&[tx.hash()]));
blockchain.verify_block::<N>(&block, validators.clone(), false).unwrap();
Expand All @@ -130,11 +128,9 @@ fn invalid_block() {
{
// Invalid signature
let (_, mut blockchain) = new_blockchain(genesis, &[tx.1.signer]);
assert!(blockchain.add_transaction::<N>(
true,
Transaction::Application(tx),
validators.clone()
));
blockchain
.add_transaction::<N>(true, Transaction::Application(tx), validators.clone())
.unwrap();
let mut block = blockchain.build_block::<N>(validators.clone());
blockchain.verify_block::<N>(&block, validators.clone(), false).unwrap();
match &mut block.transactions[0] {
Expand Down Expand Up @@ -170,11 +166,9 @@ fn signed_transaction() {
panic!("tendermint tx found");
};
let next_nonce = blockchain.next_nonce(signer).unwrap();
assert!(blockchain.add_transaction::<N>(
true,
Transaction::Application(tx),
validators.clone()
));
blockchain
.add_transaction::<N>(true, Transaction::Application(tx), validators.clone())
.unwrap();
assert_eq!(next_nonce + 1, blockchain.next_nonce(signer).unwrap());
}
let block = blockchain.build_block::<N>(validators.clone());
Expand Down Expand Up @@ -363,11 +357,9 @@ async fn tendermint_evidence_tx() {
let Transaction::Tendermint(tx) = tx else {
panic!("non-tendermint tx found");
};
assert!(blockchain.add_transaction::<N>(
true,
Transaction::Tendermint(tx),
validators.clone()
));
blockchain
.add_transaction::<N>(true, Transaction::Tendermint(tx), validators.clone())
.unwrap();
}
let block = blockchain.build_block::<N>(validators.clone());
assert_eq!(blockchain.tip(), tip);
Expand Down Expand Up @@ -475,7 +467,7 @@ async fn block_tx_ordering() {
let signed_tx = Transaction::Application(SignedTx::Signed(Box::new(
crate::tests::signed_transaction(&mut OsRng, genesis, &key, i),
)));
assert!(blockchain.add_transaction::<N>(true, signed_tx.clone(), validators.clone()));
blockchain.add_transaction::<N>(true, signed_tx.clone(), validators.clone()).unwrap();
mempool.push(signed_tx);

let unsigned_tx = Transaction::Tendermint(
Expand All @@ -485,7 +477,7 @@ async fn block_tx_ordering() {
)
.await,
);
assert!(blockchain.add_transaction::<N>(true, unsigned_tx.clone(), validators.clone()));
blockchain.add_transaction::<N>(true, unsigned_tx.clone(), validators.clone()).unwrap();
mempool.push(unsigned_tx);

let provided_tx =
Expand Down
Loading

0 comments on commit 19e90b2

Please sign in to comment.