diff --git a/processor/src/db.rs b/processor/src/db.rs index 212341d0c..d2d4da427 100644 --- a/processor/src/db.rs +++ b/processor/src/db.rs @@ -1,4 +1,3 @@ -use core::marker::PhantomData; use std::io::Read; use scale::{Encode, Decode}; @@ -8,44 +7,18 @@ pub use serai_db::*; use crate::networks::{Block, Network}; -#[derive(Debug)] -pub struct MainDb(D, PhantomData); -impl MainDb { - pub fn new(db: D) -> Self { - Self(db, PhantomData) +create_db!( + MainDb { + HandledMessageDb: (id: u64) -> (), + PendingActivationsDb: () -> Vec } +); - fn main_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { - D::key(b"MAIN", dst, key) - } - - fn handled_key(id: u64) -> Vec { - Self::main_key(b"handled", id.to_le_bytes()) - } - pub fn handled_message(&self, id: u64) -> bool { - self.0.get(Self::handled_key(id)).is_some() - } - pub fn handle_message(txn: &mut D::Transaction<'_>, id: u64) { - txn.put(Self::handled_key(id), []) - } - - fn pending_activation_key() -> Vec { - Self::main_key(b"pending_activation", []) - } - pub fn set_pending_activation( - txn: &mut D::Transaction<'_>, - block_before_queue_block: >::Id, - set: ValidatorSet, - key_pair: KeyPair, - ) { - let mut buf = (set, key_pair).encode(); - buf.extend(block_before_queue_block.as_ref()); - txn.put(Self::pending_activation_key(), buf); - } - pub fn pending_activation( - getter: &G, +impl PendingActivationsDb { + pub fn pending_activation( + getter: &impl Get, ) -> Option<(>::Id, ValidatorSet, KeyPair)> { - if let Some(bytes) = getter.get(Self::pending_activation_key()) { + if let Some(bytes) = Self::get(getter) { if !bytes.is_empty() { let mut slice = bytes.as_slice(); let (set, key_pair) = <(ValidatorSet, KeyPair)>::decode(&mut slice).unwrap(); @@ -57,7 +30,14 @@ impl MainDb { } None } - pub fn clear_pending_activation(txn: &mut D::Transaction<'_>) { - txn.put(Self::pending_activation_key(), []); + pub fn set_pending_activation( + txn: &mut impl DbTxn, + block_before_queue_block: >::Id, + set: ValidatorSet, + key_pair: KeyPair, + ) { + let mut buf = (set, key_pair).encode(); + buf.extend(block_before_queue_block.as_ref()); + Self::set(txn, &buf); } } diff --git a/processor/src/main.rs b/processor/src/main.rs index 54e2e6758..238297040 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -352,7 +352,12 @@ async fn handle_coordinator_msg( // We can't set these keys for activation until we know their queue block, which we // won't until the next Batch is confirmed // Set this variable so when we get the next Batch event, we can handle it - MainDb::::set_pending_activation(txn, block_before_queue_block, set, key_pair); + PendingActivationsDb::set_pending_activation::( + txn, + block_before_queue_block, + set, + key_pair, + ); } } @@ -365,7 +370,7 @@ async fn handle_coordinator_msg( } => { assert_eq!(network_id, N::NETWORK, "coordinator sent us data for another network"); - if let Some((block, set, key_pair)) = MainDb::::pending_activation(txn) { + if let Some((block, set, key_pair)) = PendingActivationsDb::pending_activation::(txn) { // Only run if this is a Batch belonging to a distinct block if context.network_latest_finalized_block.as_ref() != block.as_ref() { let mut queue_block = >::Id::default(); @@ -387,8 +392,8 @@ async fn handle_coordinator_msg( activation_number, ) .await; - - MainDb::::clear_pending_activation(txn); + //clear pending activation + txn.del(PendingActivationsDb::key()); } } @@ -445,7 +450,7 @@ async fn boot( raw_db: &mut D, network: &N, coordinator: &mut Co, -) -> (MainDb, TributaryMutable, SubstrateMutable) { +) -> (D, TributaryMutable, SubstrateMutable) { let mut entropy_transcript = { let entropy = Zeroizing::new(env::var("ENTROPY").expect("entropy wasn't specified")); if entropy.len() != 64 { @@ -488,8 +493,6 @@ async fn boot( let mut batch_signer = None; let mut signers = HashMap::new(); - let main_db = MainDb::::new(raw_db.clone()); - for (i, key) in current_keys.iter().enumerate() { let Some((substrate_keys, network_keys)) = key_gen.keys(key) else { continue }; let network_key = network_keys[0].group_key(); @@ -535,7 +538,11 @@ async fn boot( // This hedges against being dropped due to full mempools, temporarily too low of a fee... tokio::spawn(Signer::::rebroadcast_task(raw_db.clone(), network.clone())); - (main_db, TributaryMutable { key_gen, batch_signer, cosigner: None, signers }, multisig_manager) + ( + raw_db.clone(), + TributaryMutable { key_gen, batch_signer, cosigner: None, signers }, + multisig_manager, + ) } #[allow(clippy::await_holding_lock)] // Needed for txn, unfortunately can't be down-scoped @@ -568,9 +575,10 @@ async fn run(mut raw_db: D, network: N, mut assert_eq!(msg.id, (last_coordinator_msg.unwrap_or(msg.id - 1) + 1)); last_coordinator_msg = Some(msg.id); + // Only handle this if we haven't already - if !main_db.handled_message(msg.id) { - MainDb::::handle_message(&mut txn, msg.id); + if HandledMessageDb::get(&main_db, msg.id).is_none() { + HandledMessageDb::set(&mut txn, msg.id, &()); // This is isolated to better think about how its ordered, or rather, about how the other // cases aren't ordered