Skip to content

Commit

Permalink
handle not locally provided txs
Browse files Browse the repository at this point in the history
  • Loading branch information
akildemir committed Sep 28, 2023
1 parent d083b1e commit c3589c3
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 74 deletions.
5 changes: 5 additions & 0 deletions coordinator/src/tributary/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ pub(crate) async fn handle_new_blocks<
let mut last_block = db.last_block(genesis);
while let Some(next) = tributary.block_after(&last_block) {
let block = tributary.block(&next).unwrap();

if !tributary.provided_waiting_list_empty() {
return;
}

handle_block::<_, _, _, _, _, _, P>(
db,
key,
Expand Down
17 changes: 15 additions & 2 deletions coordinator/tributary/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ pub enum BlockError {
/// An unsigned transaction which was already added to the chain was present again.
#[error("an unsigned transaction which was already added to the chain was present again")]
UnsignedAlreadyIncluded,
/// Transactions weren't ordered as expected (Provided, followed by Unsigned, folowed by Signed).
/// A provided transaction which was already added to the chain was present again.
#[error("an provided transaction which was already added to the chain was present again")]
ProvidedAlreadyIncluded,
/// Transactions weren't ordered as expected (Provided, followed by Unsigned, followed by Signed).
#[error("transactions weren't ordered as expected (Provided, Unsigned, Signed)")]
WrongTransactionOrder,
/// The block had a provided transaction this validator has yet to be provided.
Expand Down Expand Up @@ -175,6 +178,8 @@ impl<T: TransactionTrait> Block<T> {
schema: N::SignatureScheme,
commit: impl Fn(u32) -> Option<Commit<N::SignatureScheme>>,
unsigned_in_chain: impl Fn([u8; 32]) -> bool,
provided_in_chain: impl Fn([u8; 32]) -> bool, // TODO: merge this with unsigned_on_chain?
ignore_non_local_provided: bool,
) -> Result<(), BlockError> {
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum Order {
Expand Down Expand Up @@ -209,9 +214,17 @@ impl<T: TransactionTrait> Block<T> {

let current_tx_order = match tx.kind() {
TransactionKind::Provided(order) => {
if provided_in_chain(tx_hash) {
Err(BlockError::ProvidedAlreadyIncluded)?;
}

let Some(local) = locally_provided.get_mut(order).and_then(|deque| deque.pop_front())
else {
Err(BlockError::NonLocalProvided(txs.pop().unwrap()))?
if !ignore_non_local_provided {
Err(BlockError::NonLocalProvided(txs.pop().unwrap()))?
} else {
continue;
}
};
// Since this was a provided TX, it must be an application TX
let Transaction::Application(tx) = tx else {
Expand Down
22 changes: 19 additions & 3 deletions coordinator/tributary/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
fn unsigned_included_key(genesis: &[u8], hash: &[u8; 32]) -> Vec<u8> {
D::key(b"tributary_blockchain", b"unsigned_included", [genesis, hash].concat())
}
fn provided_included_key(genesis: &[u8], hash: &[u8; 32]) -> Vec<u8> {
D::key(b"tributary_blockchain", b"provided_included", [genesis, hash].concat())
}
fn next_nonce_key(&self, signer: &<Ristretto as Ciphersuite>::G) -> Vec<u8> {
D::key(
b"tributary_blockchain",
Expand Down Expand Up @@ -136,6 +139,12 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
db.get(Self::block_after_key(&genesis, block)).map(|bytes| bytes.try_into().unwrap())
}

pub(crate) fn provided_waiting_list_empty(db: &D, genesis: [u8; 32]) -> bool {
let key = ProvidedTransactions::<D, T>::waiting_list_key(genesis);
#[allow(clippy::unwrap_or_default)]
db.get(key).unwrap_or(vec![]).is_empty()
}

pub(crate) fn tip_from_db(db: &D, genesis: [u8; 32]) -> [u8; 32] {
db.get(Self::tip_key(genesis)).map(|bytes| bytes.try_into().unwrap()).unwrap_or(genesis)
}
Expand Down Expand Up @@ -182,18 +191,21 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
self.mempool.block(&self.next_nonces, unsigned_in_chain),
);
// build_block should not return invalid blocks
self.verify_block::<N>(&block, schema).unwrap();
self.verify_block::<N>(&block, schema, false).unwrap();
block
}

pub(crate) fn verify_block<N: Network>(
&self,
block: &Block<T>,
schema: N::SignatureScheme,
ignore_non_local_provided: bool,
) -> Result<(), BlockError> {
let db = self.db.as_ref().unwrap();
let unsigned_in_chain =
|hash: [u8; 32]| db.get(Self::unsigned_included_key(&self.genesis, &hash)).is_some();
let provided_in_chain =
|hash: [u8; 32]| db.get(Self::provided_included_key(&self.genesis, &hash)).is_some();
let commit = |block: u32| -> Option<Commit<N::SignatureScheme>> {
let commit = self.commit_by_block_number(block)?;
// commit has to be valid if it is coming from our db
Expand All @@ -207,6 +219,8 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
schema,
&commit,
unsigned_in_chain,
provided_in_chain,
ignore_non_local_provided,
)
}

Expand All @@ -217,7 +231,7 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
commit: Vec<u8>,
schema: N::SignatureScheme,
) -> Result<(), BlockError> {
self.verify_block::<N>(block, schema)?;
self.verify_block::<N>(block, schema, true)?;

log::info!(
"adding block {} to tributary {} with {} TXs",
Expand Down Expand Up @@ -249,7 +263,9 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
for tx in &block.transactions {
match tx.kind() {
TransactionKind::Provided(order) => {
self.provided.complete(&mut txn, order, tx.hash());
let hash = tx.hash();
self.provided.complete(&mut txn, order, hash);
txn.put(Self::provided_included_key(&self.genesis, &hash), []);
}
TransactionKind::Unsigned => {
let hash = tx.hash();
Expand Down
4 changes: 4 additions & 0 deletions coordinator/tributary/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@ impl<D: Db, T: TransactionTrait> TributaryReader<D, T> {
.map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time)
}

pub fn provided_waiting_list_empty(&self) -> bool {
Blockchain::<D, T>::provided_waiting_list_empty(&self.0, self.1)
}

// This isn't static, yet can be read with only minor discrepancy risks
pub fn tip(&self) -> [u8; 32] {
Blockchain::<D, T>::tip_from_db(&self.0, self.1)
Expand Down
104 changes: 73 additions & 31 deletions coordinator/tributary/src/provided.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {
D::key(b"tributary_provided", b"current", self.genesis)
}

pub(crate) fn waiting_list_key(genesis: [u8; 32]) -> Vec<u8> {
D::key(b"tributary_provided", b"waiting_list", genesis)
}

pub(crate) fn new(db: D, genesis: [u8; 32]) -> Self {
let mut res = ProvidedTransactions { db, genesis, transactions: HashMap::new() };

Expand Down Expand Up @@ -71,25 +75,50 @@ impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {
}

let tx_hash = tx.hash();
let provided_key = self.transaction_key(&tx_hash);
if self.db.get(&provided_key).is_some() {
Err(ProvidedError::AlreadyProvided)?;
}

let current_provided_key = self.current_provided_key();
// get waiting list
let waiting_list_key = Self::waiting_list_key(self.genesis);
#[allow(clippy::unwrap_or_default)]
let mut currently_provided = self.db.get(&current_provided_key).unwrap_or(vec![]);
let mut waiting_list = self.db.get(&waiting_list_key).unwrap_or(vec![]);

// check whether this tx is a late provide
let exist = waiting_list.chunks_exact(32).position(|h| {
let hash: [u8; 32] = h.try_into().unwrap();
hash == tx_hash
});
if let Some(i) = exist {
// remove from the list since it is now arrived.
let i = i * 32;
assert_eq!(&waiting_list.drain(i .. (i + 32)).collect::<Vec<_>>(), &tx_hash);

let mut txn = self.db.txn();
txn.put(waiting_list_key, waiting_list);
txn.commit();
} else {
// add to mempool if not

// check whether we already have the tx in pool
let provided_key = self.transaction_key(&tx_hash);
if self.db.get(&provided_key).is_some() {
Err(ProvidedError::AlreadyProvided)?;
}

let mut txn = self.db.txn();
txn.put(provided_key, tx.serialize());
currently_provided.extend(tx_hash);
txn.put(current_provided_key, currently_provided);
txn.commit();
let current_provided_key = self.current_provided_key();
#[allow(clippy::unwrap_or_default)]
let mut currently_provided = self.db.get(&current_provided_key).unwrap_or(vec![]);

if self.transactions.get(order).is_none() {
self.transactions.insert(order, VecDeque::new());
let mut txn = self.db.txn();
txn.put(provided_key, tx.serialize());
currently_provided.extend(tx_hash);
txn.put(current_provided_key, currently_provided);
txn.commit();

if self.transactions.get(order).is_none() {
self.transactions.insert(order, VecDeque::new());
}
self.transactions.get_mut(order).unwrap().push_back(tx);
}
self.transactions.get_mut(order).unwrap().push_back(tx);

Ok(())
}

Expand All @@ -100,25 +129,38 @@ impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {
order: &'static str,
tx: [u8; 32],
) {
assert_eq!(self.transactions.get_mut(order).unwrap().pop_front().unwrap().hash(), tx);

let current_provided_key = self.current_provided_key();
let mut currently_provided = txn.get(&current_provided_key).unwrap();

// Find this TX's hash
let mut i = 0;
loop {
if currently_provided[i .. (i + 32)] == tx {
assert_eq!(&currently_provided.drain(i .. (i + 32)).collect::<Vec<_>>(), &tx);
break;
let txs = self.transactions.get_mut(order);
if txs.as_ref().is_none() ||
(txs.as_ref().is_some() && !txs.as_ref().unwrap().iter().any(|t| t.hash() == tx))
{
// we don't have this tx in our mempool, add it to waiting list.
let waiting_list_key = Self::waiting_list_key(self.genesis);
#[allow(clippy::unwrap_or_default)]
let mut waiting_list = self.db.get(&waiting_list_key).unwrap_or(vec![]);

waiting_list.extend(tx);
txn.put(waiting_list_key, waiting_list);
} else {
assert_eq!(txs.unwrap().pop_front().unwrap().hash(), tx);

let current_provided_key = self.current_provided_key();
let mut currently_provided = txn.get(&current_provided_key).unwrap();

// Find this TX's hash
let mut i = 0;
loop {
if currently_provided[i .. (i + 32)] == tx {
assert_eq!(&currently_provided.drain(i .. (i + 32)).collect::<Vec<_>>(), &tx);
break;
}

i += 32;
if i >= currently_provided.len() {
panic!("couldn't find completed TX in currently provided");
}
}

i += 32;
if i >= currently_provided.len() {
panic!("couldn't find completed TX in currently provided");
}
txn.put(current_provided_key, currently_provided);
}

txn.put(current_provided_key, currently_provided);
}
}
11 changes: 7 additions & 4 deletions coordinator/tributary/src/tendermint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,15 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> {
let block =
Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?;
self.blockchain.read().await.verify_block::<Self>(&block, self.signature_scheme()).map_err(
|e| match e {
self
.blockchain
.read()
.await
.verify_block::<Self>(&block, self.signature_scheme(), false)
.map_err(|e| match e {
BlockError::NonLocalProvided(_) => TendermintBlockError::Temporal,
_ => TendermintBlockError::Fatal,
},
)
})
}

async fn add_block(
Expand Down
4 changes: 4 additions & 0 deletions coordinator/tributary/src/tests/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ fn empty_block() {
validators,
commit,
unsigned_in_chain,
unsigned_in_chain,
false,
)
.unwrap();
}
Expand Down Expand Up @@ -123,6 +125,8 @@ fn duplicate_nonces() {
validators.clone(),
commit,
unsigned_in_chain,
unsigned_in_chain,
false,
);
if i == 1 {
res.unwrap();
Expand Down
Loading

0 comments on commit c3589c3

Please sign in to comment.