Skip to content

Commit

Permalink
processor/db.rs macro implimentation
Browse files Browse the repository at this point in the history
  • Loading branch information
econsta committed Nov 16, 2023
1 parent 30a77d8 commit 8f73386
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 50 deletions.
50 changes: 10 additions & 40 deletions processor/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use core::marker::PhantomData;
use std::io::Read;

use scale::{Encode, Decode};
Expand All @@ -8,44 +7,18 @@ pub use serai_db::*;

use crate::networks::{Block, Network};

#[derive(Debug)]
pub struct MainDb<N: Network, D: Db>(D, PhantomData<N>);
impl<N: Network, D: Db> MainDb<N, D> {
pub fn new(db: D) -> Self {
Self(db, PhantomData)
create_db!(
MainDb {
HandledMessageDb: (id: u64) -> Vec<u8>,
PendingActivationsDb: () -> Vec<u8>
}
);

fn main_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
D::key(b"MAIN", dst, key)
}

fn handled_key(id: u64) -> Vec<u8> {
Self::main_key(b"handled", id.to_le_bytes())
}
pub fn handled_message(&self, id: u64) -> bool {
self.0.get(Self::handled_key(id)).is_some()
}
pub fn handle_message(txn: &mut D::Transaction<'_>, id: u64) {
txn.put(Self::handled_key(id), [])
}

fn pending_activation_key() -> Vec<u8> {
Self::main_key(b"pending_activation", [])
}
pub fn set_pending_activation(
txn: &mut D::Transaction<'_>,
block_before_queue_block: <N::Block as Block<N>>::Id,
set: ValidatorSet,
key_pair: KeyPair,
) {
let mut buf = (set, key_pair).encode();
buf.extend(block_before_queue_block.as_ref());
txn.put(Self::pending_activation_key(), buf);
}
pub fn pending_activation<G: Get>(
getter: &G,
impl PendingActivationsDb {
pub fn pending_activation<N: Network>(
getter: &impl Get,
) -> Option<(<N::Block as Block<N>>::Id, ValidatorSet, KeyPair)> {
if let Some(bytes) = getter.get(Self::pending_activation_key()) {
if let Some(bytes) = Self::get(getter) {
if !bytes.is_empty() {
let mut slice = bytes.as_slice();
let (set, key_pair) = <(ValidatorSet, KeyPair)>::decode(&mut slice).unwrap();
Expand All @@ -57,7 +30,4 @@ impl<N: Network, D: Db> MainDb<N, D> {
}
None
}
pub fn clear_pending_activation(txn: &mut D::Transaction<'_>) {
txn.put(Self::pending_activation_key(), []);
}
}
}
23 changes: 13 additions & 10 deletions processor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use messages::{
CoordinatorMessage,
};

use scale::Encode;

use serai_env as env;

use message_queue::{Service, client::MessageQueue};
Expand Down Expand Up @@ -352,7 +354,9 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
// We can't set these keys for activation until we know their queue block, which we
// won't until the next Batch is confirmed
// Set this variable so when we get the next Batch event, we can handle it
MainDb::<N, D>::set_pending_activation(txn, block_before_queue_block, set, key_pair);
let mut buf = (set, key_pair).encode();
buf.extend(block_before_queue_block.as_ref());
PendingActivationsDb::set(txn, &buf);
}
}

Expand All @@ -365,7 +369,7 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
} => {
assert_eq!(network_id, N::NETWORK, "coordinator sent us data for another network");

if let Some((block, set, key_pair)) = MainDb::<N, D>::pending_activation(txn) {
if let Some((block, set, key_pair)) = PendingActivationsDb::pending_activation::<N>(txn) {
// Only run if this is a Batch belonging to a distinct block
if context.network_latest_finalized_block.as_ref() != block.as_ref() {
let mut queue_block = <N::Block as Block<N>>::Id::default();
Expand All @@ -387,8 +391,8 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
activation_number,
)
.await;

MainDb::<N, D>::clear_pending_activation(txn);
//clear pending activation
PendingActivationsDb::set(txn, &vec![] as &Vec<u8>);
}
}

Expand Down Expand Up @@ -445,7 +449,7 @@ async fn boot<N: Network, D: Db, Co: Coordinator>(
raw_db: &mut D,
network: &N,
coordinator: &mut Co,
) -> (MainDb<N, D>, TributaryMutable<N, D>, SubstrateMutable<N, D>) {
) -> (D, TributaryMutable<N, D>, SubstrateMutable<N, D>) {
let mut entropy_transcript = {
let entropy = Zeroizing::new(env::var("ENTROPY").expect("entropy wasn't specified"));
if entropy.len() != 64 {
Expand Down Expand Up @@ -488,8 +492,6 @@ async fn boot<N: Network, D: Db, Co: Coordinator>(
let mut substrate_signer = None;
let mut signers = HashMap::new();

let main_db = MainDb::<N, _>::new(raw_db.clone());

for (i, key) in current_keys.iter().enumerate() {
let Some((substrate_keys, network_keys)) = key_gen.keys(key) else { continue };
let network_key = network_keys[0].group_key();
Expand Down Expand Up @@ -536,7 +538,7 @@ async fn boot<N: Network, D: Db, Co: Coordinator>(
tokio::spawn(Signer::<N, D>::rebroadcast_task(raw_db.clone(), network.clone()));

(
main_db,
raw_db.clone(),
TributaryMutable { key_gen, substrate_signer, cosigner: None, signers },
multisig_manager,
)
Expand Down Expand Up @@ -572,9 +574,10 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
assert_eq!(msg.id, (last_coordinator_msg.unwrap_or(msg.id - 1) + 1));
last_coordinator_msg = Some(msg.id);


// Only handle this if we haven't already
if !main_db.handled_message(msg.id) {
MainDb::<N, D>::handle_message(&mut txn, msg.id);
if HandledMessageDb::get(&main_db, msg.id).is_none() {
HandledMessageDb::set(&mut txn, msg.id, &vec![] as &Vec<u8>);

// This is isolated to better think about how its ordered, or rather, about how the other
// cases aren't ordered
Expand Down

0 comments on commit 8f73386

Please sign in to comment.