From 6b7c1da9c28cbc057c5b25cc2a2e525bd88748ad Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 7 Dec 2023 05:02:16 -0500 Subject: [PATCH 1/7] coordinator/src/db.rs db macro implimentation --- coordinator/src/db.rs | 212 +++++++++------------------ coordinator/src/main.rs | 70 ++++----- coordinator/src/substrate/mod.rs | 12 +- coordinator/src/tributary/scanner.rs | 2 +- 4 files changed, 110 insertions(+), 186 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index d8ec4356e..f680cef55 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -1,11 +1,9 @@ -use core::marker::PhantomData; - use blake2::{ digest::{consts::U32, Digest}, Blake2b, }; -use scale::{Encode, Decode}; +use scale::Encode; use serai_client::{ primitives::NetworkId, validator_sets::primitives::{Session, ValidatorSet}, @@ -17,31 +15,28 @@ pub use serai_db::*; use ::tributary::ReadWrite; use crate::tributary::{TributarySpec, Transaction, scanner::RecognizedIdType}; -#[derive(Debug)] -pub struct MainDb(PhantomData); -impl MainDb { - fn main_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { - D::key(b"coordinator_main", dst, key) - } - - fn handled_message_key(network: NetworkId, id: u64) -> Vec { - Self::main_key(b"handled_message", (network, id).encode()) - } - pub fn save_handled_message(txn: &mut D::Transaction<'_>, network: NetworkId, id: u64) { - txn.put(Self::handled_message_key(network, id), []); - } - pub fn handled_message(getter: &G, network: NetworkId, id: u64) -> bool { - getter.get(Self::handled_message_key(network, id)).is_some() - } - - fn active_tributaries_key() -> Vec { - Self::main_key(b"active_tributaries", []) - } - fn retired_tributary_key(set: ValidatorSet) -> Vec { - Self::main_key(b"retired_tributary", set.encode()) - } +create_db!( + MainDb { + HandledMessageDb: (network: NetworkId, id: u64) -> Vec, + InTributaryDb: (set: ValidatorSet) -> Vec, + ActiveTributaryDb: () -> Vec, + RetiredTributaryDb: (set: ValidatorSet) -> Vec, + SignedTransactionDb: (key: u32) -> Vec, + FirstPreprocessDb: + (network: NetworkId, id_type: RecognizedIdType, id: &Vec) -> Vec>, + LastRecievedBatchDb: (network: NetworkId) -> u32, + ExpectedBatchDb: (network: NetworkId, id: u32) -> [u8; 32], + BatchDb: (network: NetworkId, id: u32) -> SignedBatch, + LastVerifiedBatchDb: (network: NetworkId) -> u32, + HandoverBatchDb: (set: ValidatorSet) -> u32, + LookupHandoverBatchDb: (network: NetworkId, batch: u32) -> Session, + QueuedBatchesDb: (set: ValidatorSet) -> Vec + } +); + +impl ActiveTributaryDb { pub fn active_tributaries(getter: &G) -> (Vec, Vec) { - let bytes = getter.get(Self::active_tributaries_key()).unwrap_or(vec![]); + let bytes = getter.get(Self::key()).unwrap_or_default(); let mut bytes_ref: &[u8] = bytes.as_ref(); let mut tributaries = vec![]; @@ -51,19 +46,8 @@ impl MainDb { (bytes, tributaries) } - pub fn add_participating_in_tributary(txn: &mut D::Transaction<'_>, spec: &TributarySpec) { - let key = Self::active_tributaries_key(); - let (mut existing_bytes, existing) = Self::active_tributaries(txn); - for tributary in &existing { - if tributary == spec { - return; - } - } - spec.write(&mut existing_bytes).unwrap(); - txn.put(key, existing_bytes); - } - pub fn retire_tributary(txn: &mut D::Transaction<'_>, set: ValidatorSet) { + pub fn retire_tributary(txn: &mut impl DbTxn, set: ValidatorSet) { let mut active = Self::active_tributaries(txn).1; for i in 0 .. active.len() { if active[i].set() == set { @@ -76,139 +60,77 @@ impl MainDb { for active in active { active.write(&mut bytes).unwrap(); } - txn.put(Self::active_tributaries_key(), bytes); - txn.put(Self::retired_tributary_key(set), []); - } - pub fn is_tributary_retired(getter: &G, set: ValidatorSet) -> bool { - getter.get(Self::retired_tributary_key(set)).is_some() + txn.put(Self::key(), bytes); + txn.put(RetiredTributaryDb::key(set), []); } +} - fn signed_transaction_key(nonce: u32) -> Vec { - Self::main_key(b"signed_transaction", nonce.to_le_bytes()) - } - pub fn save_signed_transaction(txn: &mut D::Transaction<'_>, nonce: u32, tx: Transaction) { - txn.put(Self::signed_transaction_key(nonce), tx.serialize()); +pub fn add_participating_in_tributary(txn: &mut impl DbTxn, spec: &TributarySpec) { + txn.put(InTributaryDb::key(spec.set()), []); + + let key = ActiveTributaryDb::key(); + let (mut existing_bytes, existing) = ActiveTributaryDb::active_tributaries(txn); + for tributary in &existing { + if tributary == spec { + return; + } } - pub fn take_signed_transaction(txn: &mut D::Transaction<'_>, nonce: u32) -> Option { - let key = Self::signed_transaction_key(nonce); + + spec.write(&mut existing_bytes).unwrap(); + txn.put(key, existing_bytes); +} + +impl SignedTransactionDb { + pub fn take_signed_transaction(txn: &mut impl DbTxn, nonce: u32) -> Option { + let key = Self::key(nonce); let res = txn.get(&key).map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap()); if res.is_some() { txn.del(&key); } res } +} - fn first_preprocess_key(network: NetworkId, id_type: RecognizedIdType, id: &[u8]) -> Vec { - Self::main_key(b"first_preprocess", (network, id_type, id).encode()) - } +impl FirstPreprocessDb { pub fn save_first_preprocess( - txn: &mut D::Transaction<'_>, + txn: &mut impl DbTxn, network: NetworkId, id_type: RecognizedIdType, id: &[u8], preprocess: Vec>, ) { - let preprocess = preprocess.encode(); - let key = Self::first_preprocess_key(network, id_type, id); - if let Some(existing) = txn.get(&key) { + if let Some(existing) = FirstPreprocessDb::get(txn, network, id_type, &id.to_vec()) { assert_eq!(existing, preprocess, "saved a distinct first preprocess"); return; } - txn.put(key, preprocess); - } - pub fn first_preprocess( - getter: &G, - network: NetworkId, - id_type: RecognizedIdType, - id: &[u8], - ) -> Option>> { - getter - .get(Self::first_preprocess_key(network, id_type, id)) - .map(|bytes| Vec::<_>::decode(&mut bytes.as_slice()).unwrap()) - } - - fn last_received_batch_key(network: NetworkId) -> Vec { - Self::main_key(b"last_received_batch", network.encode()) - } - fn expected_batch_key(network: NetworkId, id: u32) -> Vec { - Self::main_key(b"expected_batch", (network, id).encode()) - } - pub fn save_expected_batch(txn: &mut D::Transaction<'_>, batch: &Batch) { - txn.put(Self::last_received_batch_key(batch.network), batch.id.to_le_bytes()); - txn.put( - Self::expected_batch_key(batch.network, batch.id), - Blake2b::::digest(batch.instructions.encode()), - ); - } - pub fn last_received_batch(getter: &G, network: NetworkId) -> Option { - getter - .get(Self::last_received_batch_key(network)) - .map(|id| u32::from_le_bytes(id.try_into().unwrap())) - } - pub fn expected_batch(getter: &G, network: NetworkId, id: u32) -> Option<[u8; 32]> { - getter.get(Self::expected_batch_key(network, id)).map(|batch| batch.try_into().unwrap()) - } - - fn batch_key(network: NetworkId, id: u32) -> Vec { - Self::main_key(b"batch", (network, id).encode()) - } - pub fn save_batch(txn: &mut D::Transaction<'_>, batch: SignedBatch) { - txn.put(Self::batch_key(batch.batch.network, batch.batch.id), batch.encode()); - } - pub fn batch(getter: &G, network: NetworkId, id: u32) -> Option { - getter - .get(Self::batch_key(network, id)) - .map(|batch| SignedBatch::decode(&mut batch.as_ref()).unwrap()) - } - - fn last_verified_batch_key(network: NetworkId) -> Vec { - Self::main_key(b"last_verified_batch", network.encode()) - } - pub fn save_last_verified_batch(txn: &mut D::Transaction<'_>, network: NetworkId, id: u32) { - txn.put(Self::last_verified_batch_key(network), id.to_le_bytes()); - } - pub fn last_verified_batch(getter: &G, network: NetworkId) -> Option { - getter - .get(Self::last_verified_batch_key(network)) - .map(|id| u32::from_le_bytes(id.try_into().unwrap())) + FirstPreprocessDb::set(txn, network, id_type, &id.to_vec(), &preprocess); } +} - fn handover_batch_key(set: ValidatorSet) -> Vec { - Self::main_key(b"handover_batch", set.encode()) - } - fn lookup_handover_batch_key(network: NetworkId, batch: u32) -> Vec { - Self::main_key(b"lookup_handover_batch", (network, batch).encode()) - } - pub fn set_handover_batch(txn: &mut D::Transaction<'_>, set: ValidatorSet, batch: u32) { - txn.put(Self::handover_batch_key(set), batch.to_le_bytes()); - txn.put(Self::lookup_handover_batch_key(set.network, batch), set.session.0.to_le_bytes()); - } - pub fn handover_batch(getter: &G, set: ValidatorSet) -> Option { - getter.get(Self::handover_batch_key(set)).map(|id| u32::from_le_bytes(id.try_into().unwrap())) - } - pub fn is_handover_batch( - getter: &G, - network: NetworkId, - batch: u32, - ) -> Option { - getter.get(Self::lookup_handover_batch_key(network, batch)).map(|session| ValidatorSet { - network, - session: Session(u32::from_le_bytes(session.try_into().unwrap())), - }) +impl ExpectedBatchDb { + pub fn save_expected_batch(txn: &mut impl DbTxn, batch: &Batch) { + LastRecievedBatchDb::set(txn, batch.network, &batch.id); + let hash: [u8; 32] = Blake2b::::digest(batch.instructions.encode()).into(); + Self::set(txn, batch.network, batch.id, &hash); } +} - fn queued_batches_key(set: ValidatorSet) -> Vec { - Self::main_key(b"queued_batches", set.encode()) +impl HandoverBatchDb { + pub fn set_handover_batch(txn: &mut impl DbTxn, set: ValidatorSet, batch: u32) { + Self::set(txn, set, &batch); + LookupHandoverBatchDb::set(txn, set.network, batch, &set.session); } - pub fn queue_batch(txn: &mut D::Transaction<'_>, set: ValidatorSet, batch: Transaction) { - let key = Self::queued_batches_key(set); - let mut batches = txn.get(&key).unwrap_or(vec![]); +} +impl QueuedBatchesDb { + pub fn queue(txn: &mut impl DbTxn, set: ValidatorSet, batch: Transaction) { + let key = Self::key(set); + let mut batches = txn.get(&key).unwrap_or_default(); batches.extend(batch.serialize()); txn.put(&key, batches); } - pub fn take_queued_batches(txn: &mut D::Transaction<'_>, set: ValidatorSet) -> Vec { - let key = Self::queued_batches_key(set); - let batches_vec = txn.get(&key).unwrap_or(vec![]); + pub fn take(txn: &mut impl DbTxn, set: ValidatorSet) -> Vec { + let key = Self::key(set); + let batches_vec = txn.get(&key).unwrap_or_default(); txn.del(&key); let mut batches: &[u8] = &batches_vec; diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 4e3c8a9fe..ae85e9720 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -39,7 +39,7 @@ mod tributary; use crate::tributary::{TributarySpec, SignData, Transaction, scanner::RecognizedIdType, PlanIds}; mod db; -use db::MainDb; +use db::*; mod p2p; pub use p2p::*; @@ -83,7 +83,7 @@ async fn add_tributary( tributaries: &broadcast::Sender>, spec: TributarySpec, ) { - if MainDb::::is_tributary_retired(&db, spec.set()) { + if RetiredTributaryDb::get(&db, spec.set()).is_some() { log::info!("not adding tributary {:?} since it's been retired", spec.set()); } @@ -138,7 +138,7 @@ async fn publish_signed_transaction( // Safe as we should deterministically create transactions, meaning if this is already on-disk, // it's what we're saving now - MainDb::::save_signed_transaction(txn, signed.nonce, tx); + SignedTransactionDb::set(txn, signed.nonce, &tx.serialize()); (order, signer) } else { @@ -147,7 +147,7 @@ async fn publish_signed_transaction( // If we're trying to publish 5, when the last transaction published was 3, this will delay // publication until the point in time we publish 4 - while let Some(tx) = MainDb::::take_signed_transaction( + while let Some(tx) = SignedTransactionDb::take_signed_transaction( txn, tributary .next_nonce(&signer, &order) @@ -181,7 +181,7 @@ async fn handle_processor_message( network: NetworkId, msg: &processors::Message, ) -> bool { - if MainDb::::handled_message(db, msg.network, msg.id) { + if HandledMessageDb::get(db, msg.network, msg.id).is_some() { return true; } @@ -219,7 +219,7 @@ async fn handle_processor_message( .iter() .map(|plan| plan.session) .filter(|session| { - !MainDb::::is_tributary_retired(&txn, ValidatorSet { network, session: *session }) + RetiredTributaryDb::get(&txn, ValidatorSet { network, session: *session }).is_none() }) .collect::>(); @@ -293,7 +293,7 @@ async fn handle_processor_message( batch.network, msg.network, "processor sent us a batch for a different network than it was for", ); - MainDb::::save_expected_batch(&mut txn, batch); + ExpectedBatchDb::save_expected_batch(&mut txn, batch); None } // If this is a new Batch, immediately publish it (if we can) @@ -306,7 +306,7 @@ async fn handle_processor_message( log::debug!("received batch {:?} {}", batch.batch.network, batch.batch.id); // Save this batch to the disk - MainDb::::save_batch(&mut txn, batch.clone()); + BatchDb::set(&mut txn, batch.batch.network, batch.batch.id, &batch.clone()); // Get the next-to-execute batch ID let mut next = substrate::get_expected_next_batch(serai, network).await; @@ -314,7 +314,7 @@ async fn handle_processor_message( // Since we have a new batch, publish all batches yet to be published to Serai // This handles the edge-case where batch n+1 is signed before batch n is let mut batches = VecDeque::new(); - while let Some(batch) = MainDb::::batch(&txn, network, next) { + while let Some(batch) = BatchDb::get(&txn, network, next) { batches.push_back(batch); next += 1; } @@ -359,10 +359,12 @@ async fn handle_processor_message( // If we have a relevant Tributary, check it's actually still relevant and has yet to be retired if let Some(relevant_tributary_value) = relevant_tributary { - if MainDb::::is_tributary_retired( + if RetiredTributaryDb::get( &txn, ValidatorSet { network: msg.network, session: relevant_tributary_value }, - ) { + ) + .is_some() + { relevant_tributary = None; } } @@ -491,7 +493,7 @@ async fn handle_processor_message( } sign::ProcessorMessage::Preprocess { id, preprocesses } => { if id.attempt == 0 { - MainDb::::save_first_preprocess( + FirstPreprocessDb::save_first_preprocess( &mut txn, network, RecognizedIdType::Plan, @@ -563,7 +565,7 @@ async fn handle_processor_message( // If this is the first attempt instance, wait until we synchronize around the batch // first if id.attempt == 0 { - MainDb::::save_first_preprocess( + FirstPreprocessDb::save_first_preprocess( &mut txn, spec.set().network, RecognizedIdType::Batch, @@ -588,8 +590,8 @@ async fn handle_processor_message( // all prior published `Batch`s // TODO: This assumes BatchPreprocess is immediately after Batch // Ensure that assumption - let last_received = MainDb::::last_received_batch(&txn, msg.network).unwrap(); - let handover_batch = MainDb::::handover_batch(&txn, spec.set()); + let last_received = LastRecievedBatchDb::get(&txn, msg.network).unwrap(); + let handover_batch = HandoverBatchDb::get(&txn, spec.set()); let mut queue = false; if let Some(handover_batch) = handover_batch { // There is a race condition here. We may verify all `Batch`s from the prior set, @@ -604,7 +606,7 @@ async fn handle_processor_message( // To fix this, if this is after the handover `Batch` and we have yet to verify // publication of the handover `Batch`, don't yet yield the provided. if last_received > handover_batch { - if let Some(last_verified) = MainDb::::last_verified_batch(&txn, msg.network) { + if let Some(last_verified) = LastVerifiedBatchDb::get(&txn, msg.network) { if last_verified < handover_batch { queue = true; } @@ -613,11 +615,11 @@ async fn handle_processor_message( } } } else { - MainDb::::set_handover_batch(&mut txn, spec.set(), last_received); + HandoverBatchDb::set_handover_batch(&mut txn, spec.set(), last_received); // If this isn't the first batch, meaning we do have to verify all prior batches, and // the prior Batch hasn't been verified yet... if (last_received != 0) && - MainDb::::last_verified_batch(&txn, msg.network) + LastVerifiedBatchDb::get(&txn, msg.network) .map(|last_verified| last_verified < (last_received - 1)) .unwrap_or(true) { @@ -627,14 +629,14 @@ async fn handle_processor_message( } if queue { - MainDb::::queue_batch(&mut txn, spec.set(), intended); + QueuedBatchesDb::queue(&mut txn, spec.set(), intended); vec![] } else { // Because this is post-verification of the handover batch, take all queued `Batch`s // now to ensure we don't provide this before an already queued Batch // This *may* be an unreachable case due to how last_verified_batch is set, yet it // doesn't hurt to have as a defensive pattern - let mut res = MainDb::::take_queued_batches(&mut txn, spec.set()); + let mut res = QueuedBatchesDb::take(&mut txn, spec.set()); res.push(intended); res } @@ -702,7 +704,7 @@ async fn handle_processor_message( } } - MainDb::::save_handled_message(&mut txn, msg.network, msg.id); + HandledMessageDb::set(&mut txn, msg.network, msg.id, &vec![] as &Vec); txn.commit(); true @@ -828,7 +830,7 @@ async fn handle_cosigns_and_batch_publication( let _hvq_lock = HANDOVER_VERIFY_QUEUE_LOCK.get_or_init(|| Mutex::new(())).lock().await; let mut txn = db.txn(); let mut to_publish = vec![]; - let start_id = MainDb::::last_verified_batch(&txn, network) + let start_id = LastVerifiedBatchDb::get(&txn, network) .map(|already_verified| already_verified + 1) .unwrap_or(0); if let Some(last_id) = @@ -838,9 +840,10 @@ async fn handle_cosigns_and_batch_publication( // `Batch` // If so, we need to publish queued provided `Batch` transactions for batch in start_id ..= last_id { - let is_pre_handover = MainDb::::is_handover_batch(&txn, network, batch + 1); - if let Some(set) = is_pre_handover { - let mut queued = MainDb::::take_queued_batches(&mut txn, set); + let is_pre_handover = LookupHandoverBatchDb::get(&txn, network, batch + 1); + if let Some(session) = is_pre_handover { + let set = ValidatorSet { network, session }; + let mut queued = QueuedBatchesDb::take(&mut txn, set); // is_handover_batch is only set for handover `Batch`s we're participating in, making // this safe if queued.is_empty() { @@ -851,14 +854,14 @@ async fn handle_cosigns_and_batch_publication( to_publish.push((set.session, queued.remove(0))); // Re-queue the remaining batches for remaining in queued { - MainDb::::queue_batch(&mut txn, set, remaining); + QueuedBatchesDb::queue(&mut txn, set, remaining); } } - let is_handover = MainDb::::is_handover_batch(&txn, network, batch); - if let Some(set) = is_handover { - for queued in MainDb::::take_queued_batches(&mut txn, set) { - to_publish.push((set.session, queued)); + let is_handover = LookupHandoverBatchDb::get(&txn, network, batch); + if let Some(session) = is_handover { + for queued in QueuedBatchesDb::take(&mut txn, ValidatorSet { network, session }) { + to_publish.push((session, queued)); } } } @@ -952,7 +955,7 @@ pub async fn run( let (new_tributary_spec_send, mut new_tributary_spec_recv) = mpsc::unbounded_channel(); // Reload active tributaries from the database - for spec in MainDb::::active_tributaries(&raw_db).1 { + for spec in ActiveTributaryDb::active_tributaries(&raw_db).1 { new_tributary_spec_send.send(spec).unwrap(); } @@ -1058,8 +1061,7 @@ pub async fn run( // This waits until the necessary preprocess is available 0, let get_preprocess = |raw_db, id_type, id| async move { loop { - let Some(preprocess) = MainDb::::first_preprocess(raw_db, set.network, id_type, id) - else { + let Some(preprocess) = FirstPreprocessDb::get(raw_db, set.network, id_type, id) else { log::warn!("waiting for preprocess for recognized ID"); sleep(Duration::from_millis(100)).await; continue; @@ -1096,7 +1098,7 @@ pub async fn run( let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { // If we don't have this Tributary because it's retired, break and move on - if MainDb::::is_tributary_retired(&raw_db, set) { + if RetiredTributaryDb::get(&raw_db, set).is_some() { break; } diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index b6861928a..ea8402a03 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -105,7 +105,7 @@ async fn handle_new_set( // If this txn doesn't finish, this will be re-fired // If we waited to save to the DB, this txn may be finished, preventing re-firing, yet the // prior fired event may have not been received yet - crate::MainDb::::add_participating_in_tributary(txn, &spec); + crate::db::add_participating_in_tributary(txn, &spec); new_tributary_spec.send(spec).unwrap(); } else { @@ -306,7 +306,7 @@ async fn handle_block( if !SubstrateDb::::handled_event(&db.0, hash, event_id) { log::info!("found fresh set retired event {:?}", retired_set); let mut txn = db.0.txn(); - crate::MainDb::::retire_tributary(&mut txn, set); + crate::ActiveTributaryDb::retire_tributary(&mut txn, set); tributary_retired.send(set).unwrap(); SubstrateDb::::handle_event(&mut txn, hash, event_id); txn.commit(); @@ -678,12 +678,12 @@ pub(crate) async fn verify_published_batches( optimistic_up_to: u32, ) -> Option { // TODO: Localize from MainDb to SubstrateDb - let last = crate::MainDb::::last_verified_batch(txn, network); + let last = crate::LastVerifiedBatchDb::get(txn, network); for id in last.map(|last| last + 1).unwrap_or(0) ..= optimistic_up_to { let Some(on_chain) = SubstrateDb::::batch_instructions_hash(txn, network, id) else { break; }; - let off_chain = crate::MainDb::::expected_batch(txn, network, id).unwrap(); + let off_chain = crate::ExpectedBatchDb::get(txn, network, id).unwrap(); if on_chain != off_chain { // Halt operations on this network and spin, as this is a critical fault loop { @@ -698,8 +698,8 @@ pub(crate) async fn verify_published_batches( sleep(Duration::from_secs(60)).await; } } - crate::MainDb::::save_last_verified_batch(txn, network, id); + crate::LastVerifiedBatchDb::set(txn, network, &id); } - crate::MainDb::::last_verified_batch(txn, network) + crate::LastVerifiedBatchDb::get(txn, network) } diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index d196c8f06..c127bdfa6 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -232,7 +232,7 @@ pub(crate) async fn scan_tributaries_task< let mut tributary_db = raw_db.clone(); loop { // Check if the set was retired, and if so, don't further operate - if crate::MainDb::::is_tributary_retired(&raw_db, spec.set()) { + if crate::db::RetiredTributaryDb::get(&raw_db, spec.set()).is_some() { break; } From 4e8f8473d46be267fa47ca7ee7e45b822f0979d9 Mon Sep 17 00:00:00 2001 From: EmmanuelChthonic Date: Wed, 15 Nov 2023 16:57:38 +0400 Subject: [PATCH 2/7] fixed fmt errors --- coordinator/src/db.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index f680cef55..db4bd9fda 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -22,8 +22,11 @@ create_db!( ActiveTributaryDb: () -> Vec, RetiredTributaryDb: (set: ValidatorSet) -> Vec, SignedTransactionDb: (key: u32) -> Vec, - FirstPreprocessDb: - (network: NetworkId, id_type: RecognizedIdType, id: &Vec) -> Vec>, + FirstPreprocessDb: ( + network: NetworkId, + id_type: RecognizedIdType, + id: &[u8], + ) -> Vec>, LastRecievedBatchDb: (network: NetworkId) -> u32, ExpectedBatchDb: (network: NetworkId, id: u32) -> [u8; 32], BatchDb: (network: NetworkId, id: u32) -> SignedBatch, From 48782cb1c98186a520e655c2f70e430413078ca3 Mon Sep 17 00:00:00 2001 From: EmmanuelChthonic Date: Thu, 16 Nov 2023 12:01:31 +0400 Subject: [PATCH 3/7] converted txn functions to get/set counterparts --- coordinator/src/db.rs | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index db4bd9fda..217b28aeb 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -39,7 +39,7 @@ create_db!( impl ActiveTributaryDb { pub fn active_tributaries(getter: &G) -> (Vec, Vec) { - let bytes = getter.get(Self::key()).unwrap_or_default(); + let bytes =Self::get(getter).unwrap_or_default(); let mut bytes_ref: &[u8] = bytes.as_ref(); let mut tributaries = vec![]; @@ -63,15 +63,14 @@ impl ActiveTributaryDb { for active in active { active.write(&mut bytes).unwrap(); } - txn.put(Self::key(), bytes); - txn.put(RetiredTributaryDb::key(set), []); + Self::set(txn, &bytes); + RetiredTributaryDb::set(txn, set, &vec![] as &Vec); } } pub fn add_participating_in_tributary(txn: &mut impl DbTxn, spec: &TributarySpec) { - txn.put(InTributaryDb::key(spec.set()), []); + InTributaryDb::set(txn, spec.set(), &vec![] as &Vec); - let key = ActiveTributaryDb::key(); let (mut existing_bytes, existing) = ActiveTributaryDb::active_tributaries(txn); for tributary in &existing { if tributary == spec { @@ -80,15 +79,14 @@ pub fn add_participating_in_tributary(txn: &mut impl DbTxn, spec: &TributarySpec } spec.write(&mut existing_bytes).unwrap(); - txn.put(key, existing_bytes); + ActiveTributaryDb::set(txn, &existing_bytes); } impl SignedTransactionDb { pub fn take_signed_transaction(txn: &mut impl DbTxn, nonce: u32) -> Option { - let key = Self::key(nonce); - let res = txn.get(&key).map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap()); + let res = SignedTransactionDb::get(txn, nonce).map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap()); if res.is_some() { - txn.del(&key); + txn.del(Self::key(nonce)); } res } @@ -126,15 +124,14 @@ impl HandoverBatchDb { } impl QueuedBatchesDb { pub fn queue(txn: &mut impl DbTxn, set: ValidatorSet, batch: Transaction) { - let key = Self::key(set); - let mut batches = txn.get(&key).unwrap_or_default(); + let mut batches = Self::get(txn, set).unwrap_or_default(); batches.extend(batch.serialize()); - txn.put(&key, batches); + Self::set(txn, set, &batches); } + pub fn take(txn: &mut impl DbTxn, set: ValidatorSet) -> Vec { - let key = Self::key(set); - let batches_vec = txn.get(&key).unwrap_or_default(); - txn.del(&key); + let batches_vec = Self::get(txn, set).unwrap_or_default(); + txn.del(&Self::key(set)); let mut batches: &[u8] = &batches_vec; let mut res = vec![]; From b7317d0c891e491e2723ea604a13c1da3747f442 Mon Sep 17 00:00:00 2001 From: EmmanuelChthonic Date: Thu, 16 Nov 2023 14:19:28 +0400 Subject: [PATCH 4/7] use take_signed_transaction function --- coordinator/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index ae85e9720..d86afbbc2 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -138,7 +138,7 @@ async fn publish_signed_transaction( // Safe as we should deterministically create transactions, meaning if this is already on-disk, // it's what we're saving now - SignedTransactionDb::set(txn, signed.nonce, &tx.serialize()); + SignedTransactionDb::take_signed_transaction(txn, signed.nonce); (order, signer) } else { From 0dd7acecba659a6723c7b4d2c30a64cf6cc015e1 Mon Sep 17 00:00:00 2001 From: EmmanuelChthonic Date: Tue, 21 Nov 2023 16:02:36 +0400 Subject: [PATCH 5/7] fix for two fo the tests --- coordinator/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index d86afbbc2..ae85e9720 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -138,7 +138,7 @@ async fn publish_signed_transaction( // Safe as we should deterministically create transactions, meaning if this is already on-disk, // it's what we're saving now - SignedTransactionDb::take_signed_transaction(txn, signed.nonce); + SignedTransactionDb::set(txn, signed.nonce, &tx.serialize()); (order, signer) } else { From f62dedd01ae6edecfd5d23f32dcb141ca7281a3d Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 7 Dec 2023 05:07:00 -0500 Subject: [PATCH 6/7] Misc tweaks --- coordinator/src/db.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 217b28aeb..c6308bfaa 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -25,7 +25,7 @@ create_db!( FirstPreprocessDb: ( network: NetworkId, id_type: RecognizedIdType, - id: &[u8], + id: &[u8] ) -> Vec>, LastRecievedBatchDb: (network: NetworkId) -> u32, ExpectedBatchDb: (network: NetworkId, id: u32) -> [u8; 32], @@ -39,7 +39,7 @@ create_db!( impl ActiveTributaryDb { pub fn active_tributaries(getter: &G) -> (Vec, Vec) { - let bytes =Self::get(getter).unwrap_or_default(); + let bytes = Self::get(getter).unwrap_or_default(); let mut bytes_ref: &[u8] = bytes.as_ref(); let mut tributaries = vec![]; @@ -69,7 +69,7 @@ impl ActiveTributaryDb { } pub fn add_participating_in_tributary(txn: &mut impl DbTxn, spec: &TributarySpec) { - InTributaryDb::set(txn, spec.set(), &vec![] as &Vec); + InTributaryDb::set(txn, spec.set(), &vec![] as &Vec); let (mut existing_bytes, existing) = ActiveTributaryDb::active_tributaries(txn); for tributary in &existing { @@ -84,7 +84,8 @@ pub fn add_participating_in_tributary(txn: &mut impl DbTxn, spec: &TributarySpec impl SignedTransactionDb { pub fn take_signed_transaction(txn: &mut impl DbTxn, nonce: u32) -> Option { - let res = SignedTransactionDb::get(txn, nonce).map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap()); + let res = SignedTransactionDb::get(txn, nonce) + .map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap()); if res.is_some() { txn.del(Self::key(nonce)); } @@ -100,11 +101,11 @@ impl FirstPreprocessDb { id: &[u8], preprocess: Vec>, ) { - if let Some(existing) = FirstPreprocessDb::get(txn, network, id_type, &id.to_vec()) { + if let Some(existing) = FirstPreprocessDb::get(txn, network, id_type, id) { assert_eq!(existing, preprocess, "saved a distinct first preprocess"); return; } - FirstPreprocessDb::set(txn, network, id_type, &id.to_vec(), &preprocess); + FirstPreprocessDb::set(txn, network, id_type, id, &preprocess); } } From 0077d5cfad7b21c01127a1ca497fb294f8324d7c Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 7 Dec 2023 07:17:10 -0500 Subject: [PATCH 7/7] Minor tweaks --- coordinator/src/db.rs | 61 +++++++++++++++++--------------- coordinator/src/main.rs | 16 ++++++--- coordinator/src/substrate/mod.rs | 2 +- 3 files changed, 45 insertions(+), 34 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index c6308bfaa..560946bc9 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -17,17 +17,16 @@ use crate::tributary::{TributarySpec, Transaction, scanner::RecognizedIdType}; create_db!( MainDb { - HandledMessageDb: (network: NetworkId, id: u64) -> Vec, - InTributaryDb: (set: ValidatorSet) -> Vec, + HandledMessageDb: (network: NetworkId) -> u64, ActiveTributaryDb: () -> Vec, - RetiredTributaryDb: (set: ValidatorSet) -> Vec, - SignedTransactionDb: (key: u32) -> Vec, + RetiredTributaryDb: (set: ValidatorSet) -> (), + SignedTransactionDb: (order: &[u8], nonce: u32) -> Vec, FirstPreprocessDb: ( network: NetworkId, id_type: RecognizedIdType, id: &[u8] ) -> Vec>, - LastRecievedBatchDb: (network: NetworkId) -> u32, + LastReceivedBatchDb: (network: NetworkId) -> u32, ExpectedBatchDb: (network: NetworkId, id: u32) -> [u8; 32], BatchDb: (network: NetworkId, id: u32) -> SignedBatch, LastVerifiedBatchDb: (network: NetworkId) -> u32, @@ -50,6 +49,18 @@ impl ActiveTributaryDb { (bytes, tributaries) } + pub fn add_participating_in_tributary(txn: &mut impl DbTxn, spec: &TributarySpec) { + let (mut existing_bytes, existing) = ActiveTributaryDb::active_tributaries(txn); + for tributary in &existing { + if tributary == spec { + return; + } + } + + spec.write(&mut existing_bytes).unwrap(); + ActiveTributaryDb::set(txn, &existing_bytes); + } + pub fn retire_tributary(txn: &mut impl DbTxn, set: ValidatorSet) { let mut active = Self::active_tributaries(txn).1; for i in 0 .. active.len() { @@ -64,30 +75,20 @@ impl ActiveTributaryDb { active.write(&mut bytes).unwrap(); } Self::set(txn, &bytes); - RetiredTributaryDb::set(txn, set, &vec![] as &Vec); + RetiredTributaryDb::set(txn, set, &()); } } -pub fn add_participating_in_tributary(txn: &mut impl DbTxn, spec: &TributarySpec) { - InTributaryDb::set(txn, spec.set(), &vec![] as &Vec); - - let (mut existing_bytes, existing) = ActiveTributaryDb::active_tributaries(txn); - for tributary in &existing { - if tributary == spec { - return; - } - } - - spec.write(&mut existing_bytes).unwrap(); - ActiveTributaryDb::set(txn, &existing_bytes); -} - impl SignedTransactionDb { - pub fn take_signed_transaction(txn: &mut impl DbTxn, nonce: u32) -> Option { - let res = SignedTransactionDb::get(txn, nonce) + pub fn take_signed_transaction( + txn: &mut impl DbTxn, + order: &[u8], + nonce: u32, + ) -> Option { + let res = SignedTransactionDb::get(txn, order, nonce) .map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap()); if res.is_some() { - txn.del(Self::key(nonce)); + Self::del(txn, order, nonce); } res } @@ -111,9 +112,13 @@ impl FirstPreprocessDb { impl ExpectedBatchDb { pub fn save_expected_batch(txn: &mut impl DbTxn, batch: &Batch) { - LastRecievedBatchDb::set(txn, batch.network, &batch.id); - let hash: [u8; 32] = Blake2b::::digest(batch.instructions.encode()).into(); - Self::set(txn, batch.network, batch.id, &hash); + LastReceivedBatchDb::set(txn, batch.network, &batch.id); + Self::set( + txn, + batch.network, + batch.id, + &Blake2b::::digest(batch.instructions.encode()).into(), + ); } } @@ -126,15 +131,15 @@ impl HandoverBatchDb { impl QueuedBatchesDb { pub fn queue(txn: &mut impl DbTxn, set: ValidatorSet, batch: Transaction) { let mut batches = Self::get(txn, set).unwrap_or_default(); - batches.extend(batch.serialize()); + batch.write(&mut batches).unwrap(); Self::set(txn, set, &batches); } pub fn take(txn: &mut impl DbTxn, set: ValidatorSet) -> Vec { let batches_vec = Self::get(txn, set).unwrap_or_default(); txn.del(&Self::key(set)); - let mut batches: &[u8] = &batches_vec; + let mut batches: &[u8] = &batches_vec; let mut res = vec![]; while !batches.is_empty() { res.push(Transaction::read(&mut batches).unwrap()); diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index ae85e9720..708ec9d02 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -138,7 +138,7 @@ async fn publish_signed_transaction( // Safe as we should deterministically create transactions, meaning if this is already on-disk, // it's what we're saving now - SignedTransactionDb::set(txn, signed.nonce, &tx.serialize()); + SignedTransactionDb::set(txn, &order, signed.nonce, &tx.serialize()); (order, signer) } else { @@ -149,6 +149,7 @@ async fn publish_signed_transaction( // publication until the point in time we publish 4 while let Some(tx) = SignedTransactionDb::take_signed_transaction( txn, + &order, tributary .next_nonce(&signer, &order) .await @@ -181,8 +182,13 @@ async fn handle_processor_message( network: NetworkId, msg: &processors::Message, ) -> bool { - if HandledMessageDb::get(db, msg.network, msg.id).is_some() { - return true; + #[allow(clippy::nonminimal_bool)] + if let Some(already_handled) = HandledMessageDb::get(db, msg.network) { + assert!(!(already_handled > msg.id)); + assert!((already_handled == msg.id) || (already_handled == msg.id - 1)); + if already_handled == msg.id { + return true; + } } let _hvq_lock = HANDOVER_VERIFY_QUEUE_LOCK.get_or_init(|| Mutex::new(())).lock().await; @@ -590,7 +596,7 @@ async fn handle_processor_message( // all prior published `Batch`s // TODO: This assumes BatchPreprocess is immediately after Batch // Ensure that assumption - let last_received = LastRecievedBatchDb::get(&txn, msg.network).unwrap(); + let last_received = LastReceivedBatchDb::get(&txn, msg.network).unwrap(); let handover_batch = HandoverBatchDb::get(&txn, spec.set()); let mut queue = false; if let Some(handover_batch) = handover_batch { @@ -704,7 +710,7 @@ async fn handle_processor_message( } } - HandledMessageDb::set(&mut txn, msg.network, msg.id, &vec![] as &Vec); + HandledMessageDb::set(&mut txn, msg.network, &msg.id); txn.commit(); true diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index ea8402a03..2d33a9a2f 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -105,7 +105,7 @@ async fn handle_new_set( // If this txn doesn't finish, this will be re-fired // If we waited to save to the DB, this txn may be finished, preventing re-firing, yet the // prior fired event may have not been received yet - crate::db::add_participating_in_tributary(txn, &spec); + crate::ActiveTributaryDb::add_participating_in_tributary(txn, &spec); new_tributary_spec.send(spec).unwrap(); } else {