Skip to content

Commit

Permalink
move provided handling into coordinator scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
akildemir committed Sep 26, 2023
1 parent d083b1e commit ef24b82
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 15 deletions.
1 change: 1 addition & 0 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ pub(crate) async fn scan_tributaries<
}
},
spec,
&tributary,
&reader,
)
.await;
Expand Down
7 changes: 6 additions & 1 deletion coordinator/src/tests/tributary/dkg.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::time::Duration;
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use zeroize::Zeroizing;
use rand_core::{RngCore, OsRng};
Expand Down Expand Up @@ -92,6 +92,7 @@ async fn dkg_test() {
&processors,
|_, _| async { panic!("test tried to publish a new Serai TX in new_processors") },
spec,
&Arc::new(tributary.clone()),
&tributary.reader(),
)
.await;
Expand All @@ -118,6 +119,7 @@ async fn dkg_test() {
&processors,
|_, _| async { panic!("test tried to publish a new Serai TX after Commitments") },
&spec,
&Arc::new(tributaries[0].1.clone()),
&tributaries[0].1.reader(),
)
.await;
Expand Down Expand Up @@ -197,6 +199,7 @@ async fn dkg_test() {
&processors,
|_, _| async { panic!("test tried to publish a new Serai TX after some shares") },
&spec,
&Arc::new(tributaries[0].1.clone()),
&tributaries[0].1.reader(),
)
.await;
Expand Down Expand Up @@ -243,6 +246,7 @@ async fn dkg_test() {
&processors,
|_, _| async { panic!("test tried to publish a new Serai TX") },
&spec,
&Arc::new(tributaries[0].1.clone()),
&tributaries[0].1.reader(),
)
.await;
Expand Down Expand Up @@ -362,6 +366,7 @@ async fn dkg_test() {
}
},
&spec,
&Arc::new(tributaries[0].1.clone()),
&tributaries[0].1.reader(),
)
.await;
Expand Down
37 changes: 33 additions & 4 deletions coordinator/src/tributary/scanner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::future::Future;
use std::sync::Arc;

use zeroize::Zeroizing;

Expand All @@ -12,6 +13,7 @@ use tributary::{
tx::{TendermintTx, decode_evidence},
TendermintNetwork,
},
Tributary, TransactionKind,
};

use serai_db::DbTxn;
Expand Down Expand Up @@ -99,6 +101,7 @@ async fn handle_block<
// TODO2: Trigger any necessary re-attempts
}

#[allow(clippy::too_many_arguments)]
pub(crate) async fn handle_new_blocks<
D: Db,
Pro: Processors,
Expand All @@ -114,12 +117,38 @@ pub(crate) async fn handle_new_blocks<
processors: &Pro,
publish_serai_tx: PST,
spec: &TributarySpec,
tributary: &TributaryReader<D, Transaction>,
tributary: &Arc<Tributary<D, Transaction, P>>,
tributary_reader: &TributaryReader<D, Transaction>,
) {
let genesis = tributary.genesis();
let genesis = tributary_reader.genesis();
let mut last_block = db.last_block(genesis);
while let Some(next) = tributary.block_after(&last_block) {
let block = tributary.block(&next).unwrap();
while let Some(next) = tributary_reader.block_after(&last_block) {
let block = tributary_reader.block(&next).unwrap();

// check whether we have the provided txs provided to us from processor.
// As long as we don't have all the provided txs, we just delay the scanning
// of the block until we have all of them.
let provided_transactions = tributary.provided_transactions().await;
for tx in &block.transactions {
if let TransactionKind::Provided(_) = tx.kind() {
let tx_hash = tx.hash();
if !provided_transactions.chunks_exact(32).any(|hash| {
let h: [u8; 32] = hash.try_into().unwrap();
h == tx_hash
}) {
return;
}
}
}

// if we come here that means we have all the provided txs
// that was in the block, so we acknowledge them.
for tx in &block.transactions {
if let TransactionKind::Provided(order) = tx.kind() {
tributary.complete_provided(order, tx.hash()).await;
}
}

handle_block::<_, _, _, _, _, _, P>(
db,
key,
Expand Down
22 changes: 18 additions & 4 deletions coordinator/tributary/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
self.provided.provide(tx)
}

pub(crate) fn complete_provided(&mut self, order: &str, hash: [u8; 32]) {
let mut txn = self.db.as_mut().unwrap().txn();
self.provided.complete(&mut txn, order, hash);
txn.commit();
}

pub(crate) fn provided_transactions(&self) -> Vec<u8> {
self.provided.currently_provided()
}

/// Returns the next nonce for signing, or None if they aren't a participant.
pub(crate) fn next_nonce(&self, key: <Ristretto as Ciphersuite>::G) -> Option<u32> {
Some(self.next_nonces.get(&key).cloned()?.max(self.mempool.next_nonce(&key).unwrap_or(0)))
Expand Down Expand Up @@ -217,7 +227,11 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
commit: Vec<u8>,
schema: N::SignatureScheme,
) -> Result<(), BlockError> {
self.verify_block::<N>(block, schema)?;
// verify the block, ignore the NonLocalProvided error.
let res = self.verify_block::<N>(block, schema);
if res.as_ref().is_err_and(|e| !matches!(e, BlockError::NonLocalProvided(_))) {
return res;
}

log::info!(
"adding block {} to tributary {} with {} TXs",
Expand Down Expand Up @@ -248,9 +262,9 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {

for tx in &block.transactions {
match tx.kind() {
TransactionKind::Provided(order) => {
self.provided.complete(&mut txn, order, tx.hash());
}
// We used to call the provided.complete() here
// but that is now moved to coordinator scanner.
TransactionKind::Provided(_) => {}
TransactionKind::Unsigned => {
let hash = tx.hash();
// Save as included on chain
Expand Down
8 changes: 8 additions & 0 deletions coordinator/tributary/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
self.network.blockchain.write().await.provide_transaction(tx)
}

pub async fn complete_provided(&self, order: &str, hash: [u8; 32]) {
self.network.blockchain.write().await.complete_provided(order, hash)
}

pub async fn provided_transactions(&self) -> Vec<u8> {
self.network.blockchain.read().await.provided_transactions()
}

pub async fn next_nonce(&self, signer: <Ristretto as Ciphersuite>::G) -> Option<u32> {
self.network.blockchain.read().await.next_nonce(signer)
}
Expand Down
13 changes: 7 additions & 6 deletions coordinator/tributary/src/provided.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,7 @@ impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {
}

/// Complete a provided transaction, no longer proposing it nor voting for its validity.
pub(crate) fn complete(
&mut self,
txn: &mut D::Transaction<'_>,
order: &'static str,
tx: [u8; 32],
) {
pub(crate) fn complete(&mut self, txn: &mut D::Transaction<'_>, order: &str, tx: [u8; 32]) {
assert_eq!(self.transactions.get_mut(order).unwrap().pop_front().unwrap().hash(), tx);

let current_provided_key = self.current_provided_key();
Expand All @@ -121,4 +116,10 @@ impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {

txn.put(current_provided_key, currently_provided);
}

pub(crate) fn currently_provided(&self) -> Vec<u8> {
let current_provided_key = self.current_provided_key();
#[allow(clippy::unwrap_or_default)]
self.db.get(current_provided_key).unwrap_or(vec![])
}
}

0 comments on commit ef24b82

Please sign in to comment.