Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve provided handling #381

Merged
merged 5 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,18 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
log::trace!("providing transaction {}", hex::encode(tx.hash()));
let res = tributary.tributary.provide_transaction(tx).await;
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
if res == Err(ProvidedError::LocalMismatchesOnChain) {
// Spin, since this is a crit for this Tributary
loop {
log::error!(
"{}. tributary: {}, provided: SubstrateBlock({})",
"tributary added distinct provided to delayed locally provided TX",
hex::encode(tributary.spec.genesis()),
block,
);
sleep(Duration::from_secs(60)).await;
}
}
panic!("provided an invalid transaction: {res:?}");
}
}
Expand Down Expand Up @@ -956,8 +968,20 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
match tx.kind() {
TransactionKind::Provided(_) => {
log::trace!("providing transaction {}", hex::encode(tx.hash()));
let res = tributary.provide_transaction(tx).await;
let res = tributary.provide_transaction(tx.clone()).await;
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
if res == Err(ProvidedError::LocalMismatchesOnChain) {
// Spin, since this is a crit for this Tributary
loop {
log::error!(
"{}. tributary: {}, provided: {:?}",
"tributary added distinct provided to delayed locally provided TX",
hex::encode(spec.genesis()),
&tx,
);
sleep(Duration::from_secs(60)).await;
}
}
panic!("provided an invalid transaction: {res:?}");
}
}
Expand Down
16 changes: 15 additions & 1 deletion coordinator/src/tributary/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use ciphersuite::{Ciphersuite, Ristretto};
use serai_client::{validator_sets::primitives::ValidatorSet, subxt::utils::Encoded};

use tributary::{
Transaction as TributaryTransaction, Block, TributaryReader,
TransactionKind, Transaction as TributaryTransaction, Block, TributaryReader,
tendermint::{
tx::{TendermintTx, decode_evidence},
TendermintNetwork,
Expand Down Expand Up @@ -120,6 +120,20 @@ pub(crate) async fn handle_new_blocks<
let mut last_block = db.last_block(genesis);
while let Some(next) = tributary.block_after(&last_block) {
let block = tributary.block(&next).unwrap();

// Make sure we have all of the provided transactions for this block
for tx in &block.transactions {
// Provided TXs will appear first in the Block, so we can break after we hit a non-Provided
let TransactionKind::Provided(order) = tx.kind() else {
break;
};

// make sure we have all the provided txs in this block locally
if !tributary.locally_provided_txs_in_block(&block.hash(), order) {
return;
}
}

handle_block::<_, _, _, _, _, _, P>(
db,
key,
Expand Down
35 changes: 19 additions & 16 deletions coordinator/tributary/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ pub enum BlockError {
/// An unsigned transaction which was already added to the chain was present again.
#[error("an unsigned transaction which was already added to the chain was present again")]
UnsignedAlreadyIncluded,
/// Transactions weren't ordered as expected (Provided, followed by Unsigned, folowed by Signed).
/// A provided transaction which was already added to the chain was present again.
#[error("an provided transaction which was already added to the chain was present again")]
ProvidedAlreadyIncluded,
/// Transactions weren't ordered as expected (Provided, followed by Unsigned, followed by Signed).
#[error("transactions weren't ordered as expected (Provided, Unsigned, Signed)")]
WrongTransactionOrder,
/// The block had a provided transaction this validator has yet to be provided.
Expand Down Expand Up @@ -175,6 +178,8 @@ impl<T: TransactionTrait> Block<T> {
schema: N::SignatureScheme,
commit: impl Fn(u32) -> Option<Commit<N::SignatureScheme>>,
unsigned_in_chain: impl Fn([u8; 32]) -> bool,
provided_in_chain: impl Fn([u8; 32]) -> bool, // TODO: merge this with unsigned_on_chain?
allow_non_local_provided: bool,
) -> Result<(), BlockError> {
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum Order {
Expand Down Expand Up @@ -209,17 +214,21 @@ impl<T: TransactionTrait> Block<T> {

let current_tx_order = match tx.kind() {
TransactionKind::Provided(order) => {
let Some(local) = locally_provided.get_mut(order).and_then(|deque| deque.pop_front())
else {
Err(BlockError::NonLocalProvided(txs.pop().unwrap()))?
};
// Since this was a provided TX, it must be an application TX
let Transaction::Application(tx) = tx else {
if provided_in_chain(tx_hash) {
Err(BlockError::ProvidedAlreadyIncluded)?;
}

if let Some(local) = locally_provided.get_mut(order).and_then(|deque| deque.pop_front()) {
// Since this was a provided TX, it must be an application TX
let Transaction::Application(tx) = tx else {
Err(BlockError::NonLocalProvided(txs.pop().unwrap()))?
};
if tx != &local {
Err(BlockError::DistinctProvided)?;
}
} else if !allow_non_local_provided {
Err(BlockError::NonLocalProvided(txs.pop().unwrap()))?
};
if tx != &local {
Err(BlockError::DistinctProvided)?;
}

Order::Provided
}
Expand All @@ -241,12 +250,6 @@ impl<T: TransactionTrait> Block<T> {
}
last_tx_order = current_tx_order;

if current_tx_order == Order::Provided {
// We don't need to call verify_transaction since we did when we locally provided this
// transaction. Since it's identical, it must be valid
continue;
}

// TODO: should we modify the verify_transaction to take `Transaction<T>` or
// use this pattern of verifying tendermint Txs and app txs differently?
match tx {
Expand Down
33 changes: 30 additions & 3 deletions coordinator/tributary/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
fn unsigned_included_key(genesis: &[u8], hash: &[u8; 32]) -> Vec<u8> {
D::key(b"tributary_blockchain", b"unsigned_included", [genesis, hash].concat())
}
fn provided_included_key(genesis: &[u8], hash: &[u8; 32]) -> Vec<u8> {
D::key(b"tributary_blockchain", b"provided_included", [genesis, hash].concat())
}
fn next_nonce_key(&self, signer: &<Ristretto as Ciphersuite>::G) -> Vec<u8> {
D::key(
b"tributary_blockchain",
Expand Down Expand Up @@ -136,6 +139,23 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
db.get(Self::block_after_key(&genesis, block)).map(|bytes| bytes.try_into().unwrap())
}

pub(crate) fn locally_provided_txs_in_block(
db: &D,
genesis: &[u8; 32],
block: &[u8; 32],
order: &str,
) -> bool {
let local_key = ProvidedTransactions::<D, T>::locally_provided_quantity_key(genesis, order);
let local =
db.get(local_key).map(|bytes| u32::from_le_bytes(bytes.try_into().unwrap())).unwrap_or(0);
let block_key =
ProvidedTransactions::<D, T>::block_provided_quantity_key(genesis, block, order);
let block =
db.get(block_key).map(|bytes| u32::from_le_bytes(bytes.try_into().unwrap())).unwrap_or(0);

local >= block
}

pub(crate) fn tip_from_db(db: &D, genesis: [u8; 32]) -> [u8; 32] {
db.get(Self::tip_key(genesis)).map(|bytes| bytes.try_into().unwrap()).unwrap_or(genesis)
}
Expand Down Expand Up @@ -182,18 +202,21 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
self.mempool.block(&self.next_nonces, unsigned_in_chain),
);
// build_block should not return invalid blocks
self.verify_block::<N>(&block, schema).unwrap();
self.verify_block::<N>(&block, schema, false).unwrap();
block
}

pub(crate) fn verify_block<N: Network>(
&self,
block: &Block<T>,
schema: N::SignatureScheme,
allow_non_local_provided: bool,
) -> Result<(), BlockError> {
let db = self.db.as_ref().unwrap();
let unsigned_in_chain =
|hash: [u8; 32]| db.get(Self::unsigned_included_key(&self.genesis, &hash)).is_some();
let provided_in_chain =
|hash: [u8; 32]| db.get(Self::provided_included_key(&self.genesis, &hash)).is_some();
let commit = |block: u32| -> Option<Commit<N::SignatureScheme>> {
let commit = self.commit_by_block_number(block)?;
// commit has to be valid if it is coming from our db
Expand All @@ -207,6 +230,8 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
schema,
&commit,
unsigned_in_chain,
provided_in_chain,
allow_non_local_provided,
)
}

Expand All @@ -217,7 +242,7 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
commit: Vec<u8>,
schema: N::SignatureScheme,
) -> Result<(), BlockError> {
self.verify_block::<N>(block, schema)?;
self.verify_block::<N>(block, schema, true)?;

log::info!(
"adding block {} to tributary {} with {} TXs",
Expand Down Expand Up @@ -249,7 +274,9 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
for tx in &block.transactions {
match tx.kind() {
TransactionKind::Provided(order) => {
self.provided.complete(&mut txn, order, tx.hash());
let hash = tx.hash();
self.provided.complete(&mut txn, order, self.tip, hash);
txn.put(Self::provided_included_key(&self.genesis, &hash), []);
}
TransactionKind::Unsigned => {
let hash = tx.hash();
Expand Down
4 changes: 4 additions & 0 deletions coordinator/tributary/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,10 @@ impl<D: Db, T: TransactionTrait> TributaryReader<D, T> {
.map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time)
}

pub fn locally_provided_txs_in_block(&self, hash: &[u8; 32], order: &str) -> bool {
Blockchain::<D, T>::locally_provided_txs_in_block(&self.0, &self.1, hash, order)
}

// This isn't static, yet can be read with only minor discrepancy risks
pub fn tip(&self) -> [u8; 32] {
Blockchain::<D, T>::tip_from_db(&self.0, self.1)
Expand Down
Loading