Skip to content

Commit

Permalink
use topic number instead of waiting list
Browse files Browse the repository at this point in the history
  • Loading branch information
akildemir committed Sep 30, 2023
1 parent c3589c3 commit 3a4f7de
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 58 deletions.
15 changes: 13 additions & 2 deletions coordinator/src/tributary/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tributary::{
tx::{TendermintTx, decode_evidence},
TendermintNetwork,
},
TransactionKind,
};

use serai_db::DbTxn;
Expand Down Expand Up @@ -121,8 +122,18 @@ pub(crate) async fn handle_new_blocks<
while let Some(next) = tributary.block_after(&last_block) {
let block = tributary.block(&next).unwrap();

if !tributary.provided_waiting_list_empty() {
return;
for tx in &block.transactions {
// since we know provided txs are the first in the block, we can assume that
// all of them were ok if we haven't returned yet and got a new kind, so we can
// break and continue to scan the block.
let TransactionKind::Provided(order) = tx.kind() else {
break;
};

// make sure we have all the provided txs in this block locally
if !tributary.provided_txs_ok_for_block(&block.hash(), order) {
return;
}
}

handle_block::<_, _, _, _, _, _, P>(
Expand Down
30 changes: 26 additions & 4 deletions coordinator/tributary/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,32 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
db.get(Self::block_after_key(&genesis, block)).map(|bytes| bytes.try_into().unwrap())
}

pub(crate) fn provided_waiting_list_empty(db: &D, genesis: [u8; 32]) -> bool {
let key = ProvidedTransactions::<D, T>::waiting_list_key(genesis);
// TODO: no idea what to call this function
pub(crate) fn provided_txs_ok_for_block(
db: &D,
genesis: &[u8; 32],
block: &[u8; 32],
order: &str,
) -> bool {
let local_key = ProvidedTransactions::<D, T>::local_transaction_no_key(genesis, order);
let on_chain_key = ProvidedTransactions::<D, T>::last_tx_block_order_key(genesis, block, order);
#[allow(clippy::unwrap_or_default)]
db.get(key).unwrap_or(vec![]).is_empty()
let local = u32::from_le_bytes(
db.get(local_key)
.unwrap_or(u32::try_from(0).unwrap().to_le_bytes().to_vec())
.try_into()
.unwrap(),
);
let on_chain = u32::from_le_bytes(
db.get(on_chain_key)
.unwrap_or(u32::try_from(0).unwrap().to_le_bytes().to_vec())
.try_into()
.unwrap(),
);

// TODO: clean up the old used keys? since we don't need block-order -> tx_no
// info once we get pass the block.
local >= on_chain
}

pub(crate) fn tip_from_db(db: &D, genesis: [u8; 32]) -> [u8; 32] {
Expand Down Expand Up @@ -264,7 +286,7 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
match tx.kind() {
TransactionKind::Provided(order) => {
let hash = tx.hash();
self.provided.complete(&mut txn, order, hash);
self.provided.complete(&mut txn, order, self.tip, hash);
txn.put(Self::provided_included_key(&self.genesis, &hash), []);
}
TransactionKind::Unsigned => {
Expand Down
4 changes: 2 additions & 2 deletions coordinator/tributary/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ impl<D: Db, T: TransactionTrait> TributaryReader<D, T> {
.map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time)
}

pub fn provided_waiting_list_empty(&self) -> bool {
Blockchain::<D, T>::provided_waiting_list_empty(&self.0, self.1)
pub fn provided_txs_ok_for_block(&self, hash: &[u8; 32], order: &str) -> bool {
Blockchain::<D, T>::provided_txs_ok_for_block(&self.0, &self.1, hash, order)
}

// This isn't static, yet can be read with only minor discrepancy risks
Expand Down
105 changes: 70 additions & 35 deletions coordinator/tributary/src/provided.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,18 @@ impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {
fn current_provided_key(&self) -> Vec<u8> {
D::key(b"tributary_provided", b"current", self.genesis)
}

pub(crate) fn waiting_list_key(genesis: [u8; 32]) -> Vec<u8> {
D::key(b"tributary_provided", b"waiting_list", genesis)
pub(crate) fn local_transaction_no_key(genesis: &[u8; 32], order: &str) -> Vec<u8> {
D::key(b"tributary_provided", b"local", [genesis, order.as_bytes()].concat())
}
pub(crate) fn on_chain_transaction_no_key(genesis: &[u8; 32], order: &str) -> Vec<u8> {
D::key(b"tributary_provided", b"on_chain", [genesis, order.as_bytes()].concat())
}
pub(crate) fn last_tx_block_order_key(
genesis: &[u8; 32],
hash: &[u8; 32],
order: &str,
) -> Vec<u8> {
D::key(b"tributary_provided", b"on_chain", [genesis, hash, order.as_bytes()].concat())
}

pub(crate) fn new(db: D, genesis: [u8; 32]) -> Self {
Expand Down Expand Up @@ -73,30 +82,31 @@ impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {
Ok(()) => {}
Err(e) => Err(ProvidedError::InvalidProvided(e))?,
}

let tx_hash = tx.hash();

// get waiting list
let waiting_list_key = Self::waiting_list_key(self.genesis);
// get local and on-chain tx numbers
let local_key = Self::local_transaction_no_key(&self.genesis, order);
let on_chain_key = Self::on_chain_transaction_no_key(&self.genesis, order);
#[allow(clippy::unwrap_or_default)]
let mut waiting_list = self.db.get(&waiting_list_key).unwrap_or(vec![]);

// check whether this tx is a late provide
let exist = waiting_list.chunks_exact(32).position(|h| {
let hash: [u8; 32] = h.try_into().unwrap();
hash == tx_hash
});
if let Some(i) = exist {
// remove from the list since it is now arrived.
let i = i * 32;
assert_eq!(&waiting_list.drain(i .. (i + 32)).collect::<Vec<_>>(), &tx_hash);

let mut txn = self.db.txn();
txn.put(waiting_list_key, waiting_list);
txn.commit();
} else {
// add to mempool if not

let on_chain_tx_no = u32::from_le_bytes(
self
.db
.get(on_chain_key)
.unwrap_or(u32::try_from(0).unwrap().to_le_bytes().to_vec())
.try_into()
.unwrap(),
);
let mut local_tx_no = u32::from_le_bytes(
self
.db
.get(&local_key)
.unwrap_or(u32::try_from(0).unwrap().to_le_bytes().to_vec())
.try_into()
.unwrap(),
);

// try add to mempool if this is a new provided(we haven't seen it on-chain before).
if local_tx_no >= on_chain_tx_no {
// check whether we already have the tx in pool
let provided_key = self.transaction_key(&tx_hash);
if self.db.get(&provided_key).is_some() {
Expand All @@ -119,6 +129,12 @@ impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {
self.transactions.get_mut(order).unwrap().push_back(tx);
}

// bump the tx number for the local order
local_tx_no += 1;
let mut txn = self.db.txn();
txn.put(local_key, local_tx_no.to_le_bytes());
txn.commit();

Ok(())
}

Expand All @@ -127,20 +143,11 @@ impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {
&mut self,
txn: &mut D::Transaction<'_>,
order: &'static str,
block: [u8; 32],
tx: [u8; 32],
) {
let txs = self.transactions.get_mut(order);
if txs.as_ref().is_none() ||
(txs.as_ref().is_some() && !txs.as_ref().unwrap().iter().any(|t| t.hash() == tx))
{
// we don't have this tx in our mempool, add it to waiting list.
let waiting_list_key = Self::waiting_list_key(self.genesis);
#[allow(clippy::unwrap_or_default)]
let mut waiting_list = self.db.get(&waiting_list_key).unwrap_or(vec![]);

waiting_list.extend(tx);
txn.put(waiting_list_key, waiting_list);
} else {
if txs.is_some() && txs.as_ref().unwrap().iter().any(|t| t.hash() == tx) {
assert_eq!(txs.unwrap().pop_front().unwrap().hash(), tx);

let current_provided_key = self.current_provided_key();
Expand All @@ -162,5 +169,33 @@ impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {

txn.put(current_provided_key, currently_provided);
}

// bump the on-chain tx number.
let on_chain_key = Self::on_chain_transaction_no_key(&self.genesis, order);
let block_order_key = Self::last_tx_block_order_key(&self.genesis, &block, order);
let mut on_chain_tx_no = u32::from_le_bytes(
self
.db
.get(&on_chain_key)
.unwrap_or(u32::try_from(0).unwrap().to_le_bytes().to_vec())
.try_into()
.unwrap(),
);

// TODO: use block hash or block number block-order key?
// - Block hash is easy to use for keys and doesn't require additional api
// but it takes up too much space.
// - Block numbers are not that suitable as keys and requires additional tributary scanner
// api(block_hash -> block_no) but doesn't take much space.

// TODO: do we need both a global save and block-order save?
// Technically it should be enough to save the tx for block-order only, but that requires
// api change in `provide` function to take block hash as well(last block of the chain),
// we should be able pass the last block from where we call it, but we do we want that?

// save it
on_chain_tx_no += 1;
txn.put(on_chain_key, on_chain_tx_no.to_le_bytes());
txn.put(block_order_key, on_chain_tx_no.to_le_bytes());
}
}
97 changes: 84 additions & 13 deletions coordinator/tributary/src/tests/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ fn provided_transaction() {
let validators = Arc::new(Validators::new(genesis, vec![]).unwrap());
let (db, mut blockchain) = new_blockchain::<ProvidedTransaction>(genesis, &[]);

let tx = random_provided_transaction(&mut OsRng);
let tx = random_provided_transaction(&mut OsRng, "order1");

// This should be providable
let mut temp_db = MemDb::new();
Expand All @@ -226,10 +226,10 @@ fn provided_transaction() {
assert_eq!(txs.provide(tx.clone()), Err(ProvidedError::AlreadyProvided));
assert_eq!(
ProvidedTransactions::<_, ProvidedTransaction>::new(temp_db.clone(), genesis).transactions,
HashMap::from([("provided", VecDeque::from([tx.clone()]))]),
HashMap::from([("order1", VecDeque::from([tx.clone()]))]),
);
let mut txn = temp_db.txn();
txs.complete(&mut txn, "provided", tx.hash());
txs.complete(&mut txn, "order1", [0u8; 32], tx.hash());
txn.commit();
assert!(ProvidedTransactions::<_, ProvidedTransaction>::new(db.clone(), genesis)
.transactions
Expand Down Expand Up @@ -260,18 +260,88 @@ fn provided_transaction() {

// case we don't have the block's provided txs in our local
{
let tx = random_provided_transaction(&mut OsRng);
let block = Block::new(blockchain.tip(), vec![tx.clone()], vec![]);
let tx1 = random_provided_transaction(&mut OsRng, "order1");
let tx2 = random_provided_transaction(&mut OsRng, "order1");
let tx3 = random_provided_transaction(&mut OsRng, "order2");
let tx4 = random_provided_transaction(&mut OsRng, "order2");

// add_block DOES NOT fail for unverified provided transactions if told to add them,
// since now we can have them later.
assert!(blockchain.add_block::<N>(&block, vec![], validators.clone()).is_ok());
let block1 = Block::new(blockchain.tip(), vec![tx1.clone(), tx3.clone()], vec![]);
assert!(blockchain.add_block::<N>(&block1, vec![], validators.clone()).is_ok());

// in fact, we can have many blocks that have provided txs that we don't have locally.
let block2 = Block::new(blockchain.tip(), vec![tx2.clone(), tx4.clone()], vec![]);
assert!(blockchain.add_block::<N>(&block2, vec![], validators.clone()).is_ok());

// make sure we won't return ok for the block before we actually got the txs
let TransactionKind::Provided(order) = tx1.kind() else { panic!("tx wasn't provided") };
assert!(!Blockchain::<MemDb, ProvidedTransaction>::provided_txs_ok_for_block(
&db,
&genesis,
&block1.hash(),
order
));
// provide the first tx
blockchain.provide_transaction(tx1).unwrap();
// it should be ok for this order now, since the second tx has different order.
assert!(Blockchain::<MemDb, ProvidedTransaction>::provided_txs_ok_for_block(
&db,
&genesis,
&block1.hash(),
order
));

// make sure waiting list is not empty now
assert!(!Blockchain::<MemDb, ProvidedTransaction>::provided_waiting_list_empty(&db, genesis));
// we provide it now..
blockchain.provide_transaction(tx.clone()).unwrap();
// list has to be empty now
assert!(Blockchain::<MemDb, ProvidedTransaction>::provided_waiting_list_empty(&db, genesis));
// give the second tx
let TransactionKind::Provided(order) = tx3.kind() else { panic!("tx wasn't provided") };
assert!(!Blockchain::<MemDb, ProvidedTransaction>::provided_txs_ok_for_block(
&db,
&genesis,
&block1.hash(),
order
));
blockchain.provide_transaction(tx3).unwrap();
// it should be ok now for the first block
assert!(Blockchain::<MemDb, ProvidedTransaction>::provided_txs_ok_for_block(
&db,
&genesis,
&block1.hash(),
order
));

// provide the second block txs
let TransactionKind::Provided(order) = tx4.kind() else { panic!("tx wasn't provided") };
// not ok yet
assert!(!Blockchain::<MemDb, ProvidedTransaction>::provided_txs_ok_for_block(
&db,
&genesis,
&block2.hash(),
order
));
blockchain.provide_transaction(tx4).unwrap();
// ok now
assert!(Blockchain::<MemDb, ProvidedTransaction>::provided_txs_ok_for_block(
&db,
&genesis,
&block2.hash(),
order
));

// provide the second block txs
let TransactionKind::Provided(order) = tx2.kind() else { panic!("tx wasn't provided") };
assert!(!Blockchain::<MemDb, ProvidedTransaction>::provided_txs_ok_for_block(
&db,
&genesis,
&block2.hash(),
order
));
blockchain.provide_transaction(tx2).unwrap();
assert!(Blockchain::<MemDb, ProvidedTransaction>::provided_txs_ok_for_block(
&db,
&genesis,
&block2.hash(),
order
));
}
}

Expand Down Expand Up @@ -418,7 +488,8 @@ async fn block_tx_ordering() {
assert!(blockchain.add_transaction::<N>(true, unsigned_tx.clone(), validators.clone()));
mempool.push(unsigned_tx);

let provided_tx = SignedTx::Provided(Box::new(random_provided_transaction(&mut OsRng)));
let provided_tx =
SignedTx::Provided(Box::new(random_provided_transaction(&mut OsRng, "order1")));
blockchain.provide_transaction(provided_tx.clone()).unwrap();
provided_txs.push(provided_tx);
}
Expand Down
16 changes: 14 additions & 2 deletions coordinator/tributary/src/tests/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ impl ReadWrite for ProvidedTransaction {

impl Transaction for ProvidedTransaction {
fn kind(&self) -> TransactionKind<'_> {
TransactionKind::Provided("provided")
match self.0[0] {
1 => TransactionKind::Provided("order1"),
2 => TransactionKind::Provided("order2"),
_ => panic!("unknown order"),
}
}

fn hash(&self) -> [u8; 32] {
Expand All @@ -74,9 +78,17 @@ impl Transaction for ProvidedTransaction {
}
}

pub fn random_provided_transaction<R: RngCore + CryptoRng>(rng: &mut R) -> ProvidedTransaction {
pub fn random_provided_transaction<R: RngCore + CryptoRng>(
rng: &mut R,
order: &str,
) -> ProvidedTransaction {
let mut data = vec![0; 512];
rng.fill_bytes(&mut data);
data[0] = match order {
"order1" => 1,
"order2" => 2,
_ => panic!("unknown order"),
};
ProvidedTransaction(data)
}

Expand Down

0 comments on commit 3a4f7de

Please sign in to comment.