Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coordinator/src/db.rs db macro implimentation #431

Merged
merged 7 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 70 additions & 142 deletions coordinator/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use core::marker::PhantomData;

use blake2::{
digest::{consts::U32, Digest},
Blake2b,
};

use scale::{Encode, Decode};
use scale::Encode;
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet},
Expand All @@ -17,31 +15,30 @@ pub use serai_db::*;
use ::tributary::ReadWrite;
use crate::tributary::{TributarySpec, Transaction, scanner::RecognizedIdType};

#[derive(Debug)]
pub struct MainDb<D: Db>(PhantomData<D>);
impl<D: Db> MainDb<D> {
fn main_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
D::key(b"coordinator_main", dst, key)
}

fn handled_message_key(network: NetworkId, id: u64) -> Vec<u8> {
Self::main_key(b"handled_message", (network, id).encode())
}
pub fn save_handled_message(txn: &mut D::Transaction<'_>, network: NetworkId, id: u64) {
txn.put(Self::handled_message_key(network, id), []);
}
pub fn handled_message<G: Get>(getter: &G, network: NetworkId, id: u64) -> bool {
getter.get(Self::handled_message_key(network, id)).is_some()
}

fn active_tributaries_key() -> Vec<u8> {
Self::main_key(b"active_tributaries", [])
}
fn retired_tributary_key(set: ValidatorSet) -> Vec<u8> {
Self::main_key(b"retired_tributary", set.encode())
}
create_db!(
MainDb {
HandledMessageDb: (network: NetworkId) -> u64,
ActiveTributaryDb: () -> Vec<u8>,
RetiredTributaryDb: (set: ValidatorSet) -> (),
SignedTransactionDb: (order: &[u8], nonce: u32) -> Vec<u8>,
FirstPreprocessDb: (
network: NetworkId,
id_type: RecognizedIdType,
id: &[u8]
) -> Vec<Vec<u8>>,
LastReceivedBatchDb: (network: NetworkId) -> u32,
ExpectedBatchDb: (network: NetworkId, id: u32) -> [u8; 32],
BatchDb: (network: NetworkId, id: u32) -> SignedBatch,
LastVerifiedBatchDb: (network: NetworkId) -> u32,
HandoverBatchDb: (set: ValidatorSet) -> u32,
LookupHandoverBatchDb: (network: NetworkId, batch: u32) -> Session,
QueuedBatchesDb: (set: ValidatorSet) -> Vec<u8>
}
);

impl ActiveTributaryDb {
pub fn active_tributaries<G: Get>(getter: &G) -> (Vec<u8>, Vec<TributarySpec>) {
let bytes = getter.get(Self::active_tributaries_key()).unwrap_or(vec![]);
let bytes = Self::get(getter).unwrap_or_default();
let mut bytes_ref: &[u8] = bytes.as_ref();

let mut tributaries = vec![];
Expand All @@ -51,19 +48,20 @@ impl<D: Db> MainDb<D> {

(bytes, tributaries)
}
pub fn add_participating_in_tributary(txn: &mut D::Transaction<'_>, spec: &TributarySpec) {
let key = Self::active_tributaries_key();
let (mut existing_bytes, existing) = Self::active_tributaries(txn);

pub fn add_participating_in_tributary(txn: &mut impl DbTxn, spec: &TributarySpec) {
let (mut existing_bytes, existing) = ActiveTributaryDb::active_tributaries(txn);
for tributary in &existing {
if tributary == spec {
return;
}
}

spec.write(&mut existing_bytes).unwrap();
txn.put(key, existing_bytes);
ActiveTributaryDb::set(txn, &existing_bytes);
}
pub fn retire_tributary(txn: &mut D::Transaction<'_>, set: ValidatorSet) {

pub fn retire_tributary(txn: &mut impl DbTxn, set: ValidatorSet) {
let mut active = Self::active_tributaries(txn).1;
for i in 0 .. active.len() {
if active[i].set() == set {
Expand All @@ -76,142 +74,72 @@ impl<D: Db> MainDb<D> {
for active in active {
active.write(&mut bytes).unwrap();
}
txn.put(Self::active_tributaries_key(), bytes);
txn.put(Self::retired_tributary_key(set), []);
}
pub fn is_tributary_retired<G: Get>(getter: &G, set: ValidatorSet) -> bool {
getter.get(Self::retired_tributary_key(set)).is_some()
Self::set(txn, &bytes);
RetiredTributaryDb::set(txn, set, &());
}
}

fn signed_transaction_key(nonce: u32) -> Vec<u8> {
Self::main_key(b"signed_transaction", nonce.to_le_bytes())
}
pub fn save_signed_transaction(txn: &mut D::Transaction<'_>, nonce: u32, tx: Transaction) {
txn.put(Self::signed_transaction_key(nonce), tx.serialize());
}
pub fn take_signed_transaction(txn: &mut D::Transaction<'_>, nonce: u32) -> Option<Transaction> {
let key = Self::signed_transaction_key(nonce);
let res = txn.get(&key).map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap());
impl SignedTransactionDb {
pub fn take_signed_transaction(
txn: &mut impl DbTxn,
order: &[u8],
nonce: u32,
) -> Option<Transaction> {
let res = SignedTransactionDb::get(txn, order, nonce)
.map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap());
if res.is_some() {
txn.del(&key);
Self::del(txn, order, nonce);
}
res
}
}

fn first_preprocess_key(network: NetworkId, id_type: RecognizedIdType, id: &[u8]) -> Vec<u8> {
Self::main_key(b"first_preprocess", (network, id_type, id).encode())
}
impl FirstPreprocessDb {
pub fn save_first_preprocess(
txn: &mut D::Transaction<'_>,
txn: &mut impl DbTxn,
network: NetworkId,
id_type: RecognizedIdType,
id: &[u8],
preprocess: Vec<Vec<u8>>,
) {
let preprocess = preprocess.encode();
let key = Self::first_preprocess_key(network, id_type, id);
if let Some(existing) = txn.get(&key) {
if let Some(existing) = FirstPreprocessDb::get(txn, network, id_type, id) {
assert_eq!(existing, preprocess, "saved a distinct first preprocess");
return;
}
txn.put(key, preprocess);
}
pub fn first_preprocess<G: Get>(
getter: &G,
network: NetworkId,
id_type: RecognizedIdType,
id: &[u8],
) -> Option<Vec<Vec<u8>>> {
getter
.get(Self::first_preprocess_key(network, id_type, id))
.map(|bytes| Vec::<_>::decode(&mut bytes.as_slice()).unwrap())
FirstPreprocessDb::set(txn, network, id_type, id, &preprocess);
}
}

fn last_received_batch_key(network: NetworkId) -> Vec<u8> {
Self::main_key(b"last_received_batch", network.encode())
}
fn expected_batch_key(network: NetworkId, id: u32) -> Vec<u8> {
Self::main_key(b"expected_batch", (network, id).encode())
}
pub fn save_expected_batch(txn: &mut D::Transaction<'_>, batch: &Batch) {
txn.put(Self::last_received_batch_key(batch.network), batch.id.to_le_bytes());
txn.put(
Self::expected_batch_key(batch.network, batch.id),
Blake2b::<U32>::digest(batch.instructions.encode()),
impl ExpectedBatchDb {
pub fn save_expected_batch(txn: &mut impl DbTxn, batch: &Batch) {
LastReceivedBatchDb::set(txn, batch.network, &batch.id);
Self::set(
txn,
batch.network,
batch.id,
&Blake2b::<U32>::digest(batch.instructions.encode()).into(),
);
}
pub fn last_received_batch<G: Get>(getter: &G, network: NetworkId) -> Option<u32> {
getter
.get(Self::last_received_batch_key(network))
.map(|id| u32::from_le_bytes(id.try_into().unwrap()))
}
pub fn expected_batch<G: Get>(getter: &G, network: NetworkId, id: u32) -> Option<[u8; 32]> {
getter.get(Self::expected_batch_key(network, id)).map(|batch| batch.try_into().unwrap())
}

fn batch_key(network: NetworkId, id: u32) -> Vec<u8> {
Self::main_key(b"batch", (network, id).encode())
}
pub fn save_batch(txn: &mut D::Transaction<'_>, batch: SignedBatch) {
txn.put(Self::batch_key(batch.batch.network, batch.batch.id), batch.encode());
}
pub fn batch<G: Get>(getter: &G, network: NetworkId, id: u32) -> Option<SignedBatch> {
getter
.get(Self::batch_key(network, id))
.map(|batch| SignedBatch::decode(&mut batch.as_ref()).unwrap())
}
}

fn last_verified_batch_key(network: NetworkId) -> Vec<u8> {
Self::main_key(b"last_verified_batch", network.encode())
}
pub fn save_last_verified_batch(txn: &mut D::Transaction<'_>, network: NetworkId, id: u32) {
txn.put(Self::last_verified_batch_key(network), id.to_le_bytes());
impl HandoverBatchDb {
pub fn set_handover_batch(txn: &mut impl DbTxn, set: ValidatorSet, batch: u32) {
Self::set(txn, set, &batch);
LookupHandoverBatchDb::set(txn, set.network, batch, &set.session);
}
pub fn last_verified_batch<G: Get>(getter: &G, network: NetworkId) -> Option<u32> {
getter
.get(Self::last_verified_batch_key(network))
.map(|id| u32::from_le_bytes(id.try_into().unwrap()))
}
impl QueuedBatchesDb {
pub fn queue(txn: &mut impl DbTxn, set: ValidatorSet, batch: Transaction) {
let mut batches = Self::get(txn, set).unwrap_or_default();
batch.write(&mut batches).unwrap();
Self::set(txn, set, &batches);
}

fn handover_batch_key(set: ValidatorSet) -> Vec<u8> {
Self::main_key(b"handover_batch", set.encode())
}
fn lookup_handover_batch_key(network: NetworkId, batch: u32) -> Vec<u8> {
Self::main_key(b"lookup_handover_batch", (network, batch).encode())
}
pub fn set_handover_batch(txn: &mut D::Transaction<'_>, set: ValidatorSet, batch: u32) {
txn.put(Self::handover_batch_key(set), batch.to_le_bytes());
txn.put(Self::lookup_handover_batch_key(set.network, batch), set.session.0.to_le_bytes());
}
pub fn handover_batch<G: Get>(getter: &G, set: ValidatorSet) -> Option<u32> {
getter.get(Self::handover_batch_key(set)).map(|id| u32::from_le_bytes(id.try_into().unwrap()))
}
pub fn is_handover_batch<G: Get>(
getter: &G,
network: NetworkId,
batch: u32,
) -> Option<ValidatorSet> {
getter.get(Self::lookup_handover_batch_key(network, batch)).map(|session| ValidatorSet {
network,
session: Session(u32::from_le_bytes(session.try_into().unwrap())),
})
}
pub fn take(txn: &mut impl DbTxn, set: ValidatorSet) -> Vec<Transaction> {
let batches_vec = Self::get(txn, set).unwrap_or_default();
txn.del(&Self::key(set));

fn queued_batches_key(set: ValidatorSet) -> Vec<u8> {
Self::main_key(b"queued_batches", set.encode())
}
pub fn queue_batch(txn: &mut D::Transaction<'_>, set: ValidatorSet, batch: Transaction) {
let key = Self::queued_batches_key(set);
let mut batches = txn.get(&key).unwrap_or(vec![]);
batches.extend(batch.serialize());
txn.put(&key, batches);
}
pub fn take_queued_batches(txn: &mut D::Transaction<'_>, set: ValidatorSet) -> Vec<Transaction> {
let key = Self::queued_batches_key(set);
let batches_vec = txn.get(&key).unwrap_or(vec![]);
txn.del(&key);
let mut batches: &[u8] = &batches_vec;

let mut res = vec![];
while !batches.is_empty() {
res.push(Transaction::read(&mut batches).unwrap());
Expand Down
Loading