Skip to content

Commit

Permalink
Add Batch messages from processor, verify Batchs published on-chain
Browse files Browse the repository at this point in the history
Renames Update to SignedBatch.

Checks Batch equality via a hash of the InInstructions. That prevents needing
to keep the Batch in node state or TX introspect.
  • Loading branch information
kayabaNerve committed Sep 29, 2023
1 parent 0be567f commit 0eff3d9
Show file tree
Hide file tree
Showing 18 changed files with 281 additions and 80 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 34 additions & 1 deletion coordinator/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
use core::marker::PhantomData;

use blake2::{
digest::{consts::U32, Digest},
Blake2b,
};

use scale::{Encode, Decode};
use serai_client::{primitives::NetworkId, in_instructions::primitives::SignedBatch};
use serai_client::{
primitives::NetworkId,
in_instructions::primitives::{Batch, SignedBatch},
};

pub use serai_db::*;

Expand Down Expand Up @@ -87,6 +95,19 @@ impl<D: Db> MainDb<D> {
getter.get(Self::first_preprocess_key(network, id))
}

fn expected_batch_key(network: NetworkId, id: u32) -> Vec<u8> {
Self::main_key(b"expected_batch", (network, id).encode())
}
pub fn save_expected_batch(txn: &mut D::Transaction<'_>, batch: &Batch) {
txn.put(
Self::expected_batch_key(batch.network, batch.id),
Blake2b::<U32>::digest(batch.instructions.encode()),
);
}
pub fn expected_batch<G: Get>(getter: &G, network: NetworkId, id: u32) -> Option<[u8; 32]> {
getter.get(Self::expected_batch_key(network, id)).map(|batch| batch.try_into().unwrap())
}

fn batch_key(network: NetworkId, id: u32) -> Vec<u8> {
Self::main_key(b"batch", (network, id).encode())
}
Expand All @@ -98,4 +119,16 @@ impl<D: Db> MainDb<D> {
.get(Self::batch_key(network, id))
.map(|batch| SignedBatch::decode(&mut batch.as_ref()).unwrap())
}

fn last_verified_batch_key(network: NetworkId) -> Vec<u8> {
Self::main_key(b"last_verified_batch", network.encode())
}
pub fn save_last_verified_batch(txn: &mut D::Transaction<'_>, network: NetworkId, id: u32) {
txn.put(Self::last_verified_batch_key(network), id.to_le_bytes());
}
pub fn last_verified_batch<G: Get>(getter: &G, network: NetworkId) -> Option<u32> {
getter
.get(Self::last_verified_batch_key(network))
.map(|id| u32::from_le_bytes(id.try_into().unwrap()))
}
}
72 changes: 62 additions & 10 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod processors;
use processors::Processors;

mod substrate;
use substrate::SubstrateDb;

#[cfg(test)]
pub mod tests;
Expand Down Expand Up @@ -118,7 +119,7 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
) {
log::info!("scanning substrate");

let mut db = substrate::SubstrateDb::new(db);
let mut db = SubstrateDb::new(db);
let mut next_substrate_block = db.next_block();

let new_substrate_block_notifier = {
Expand Down Expand Up @@ -565,16 +566,16 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
ProcessorMessage::Sign(inner_msg) => match inner_msg {
// We'll only receive Preprocess and Share if we're actively signing
sign::ProcessorMessage::Preprocess { id, .. } => {
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
Some(SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
}
sign::ProcessorMessage::Share { id, .. } => {
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
Some(SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
}
// While the Processor's Scanner will always emit Completed, that's routed through the
// Signer and only becomes a ProcessorMessage::Completed if the Signer is present and
// confirms it
sign::ProcessorMessage::Completed { key, .. } => {
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, key).unwrap())
Some(SubstrateDb::<D>::session_for_key(&txn, key).unwrap())
}
},
ProcessorMessage::Coordinator(inner_msg) => match inner_msg {
Expand Down Expand Up @@ -606,19 +607,69 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
}
// We'll only fire these if we are the Substrate signer, making the Tributary relevant
coordinator::ProcessorMessage::BatchPreprocess { id, .. } => {
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
Some(SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
}
coordinator::ProcessorMessage::BatchShare { id, .. } => {
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
Some(SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
}
},
// These don't return a relevant Tributary as there's no Tributary with action expected
ProcessorMessage::Substrate(inner_msg) => match inner_msg {
processor_messages::substrate::ProcessorMessage::Batch { batch } => {
assert_eq!(
batch.network, msg.network,
"processor sent us a batch for a different network than it was for",
);
let this_batch_id = batch.id;
MainDb::<D>::save_expected_batch(&mut txn, batch);

// Re-define batch
// We can't drop it, yet it shouldn't be accidentally used in the following block
#[allow(clippy::let_unit_value)]
let batch = ();
#[allow(clippy::let_unit_value)]
let _ = batch;

// Verify all `Batch`s which we've already indexed from Substrate
// This won't be complete, as it only runs when a `Batch` message is received, which
// will be before we get a `SignedBatch`. It is, however, incremental. We can use a
// complete version to finish the last section when we need a complete version.
let last = MainDb::<D>::last_verified_batch(&txn, msg.network);
// This variable exists so Rust can verify Send/Sync properties
let mut faulty = None;
for id in last.map(|last| last + 1).unwrap_or(0) ..= this_batch_id {
if let Some(on_chain) = SubstrateDb::<D>::batch_instructions_hash(&txn, network, id) {
let off_chain = MainDb::<D>::expected_batch(&txn, network, id).unwrap();
if on_chain != off_chain {
faulty = Some((id, off_chain, on_chain));
break;
}
MainDb::<D>::save_last_verified_batch(&mut txn, msg.network, id);
}
}

if let Some((id, off_chain, on_chain)) = faulty {
// Halt operations on this network and spin, as this is a critical fault
loop {
log::error!(
"{}! network: {:?} id: {} off-chain: {} on-chain: {}",
"on-chain batch doesn't match off-chain",
network,
id,
hex::encode(off_chain),
hex::encode(on_chain),
);
sleep(Duration::from_secs(60)).await;
}
}

None
}
// If this is a new Batch, immediately publish it (if we can)
// This doesn't return a relevant Tributary as there's no Tributary with action expected
processor_messages::substrate::ProcessorMessage::Update { batch } => {
processor_messages::substrate::ProcessorMessage::SignedBatch { batch } => {
assert_eq!(
batch.batch.network, msg.network,
"processor sent us a batch for a different network than it was for",
"processor sent us a signed batch for a different network than it was for",
);
// TODO: Check this key's key pair's substrate key is authorized to publish batches

Expand Down Expand Up @@ -849,7 +900,8 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
}
},
ProcessorMessage::Substrate(inner_msg) => match inner_msg {
processor_messages::substrate::ProcessorMessage::Update { .. } => unreachable!(),
processor_messages::substrate::ProcessorMessage::Batch { .. } => unreachable!(),
processor_messages::substrate::ProcessorMessage::SignedBatch { .. } => unreachable!(),
},
};

Expand Down
24 changes: 23 additions & 1 deletion coordinator/src/substrate/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use scale::{Encode, Decode};

pub use serai_db::*;

use serai_client::validator_sets::primitives::{Session, KeyPair};
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, KeyPair},
};

#[derive(Debug)]
pub struct SubstrateDb<D: Db>(pub D);
Expand Down Expand Up @@ -55,4 +58,23 @@ impl<D: Db> SubstrateDb<D> {
txn.put(key_0, session.clone());
txn.put(Self::session_key(&key_pair.1), session);
}

fn batch_instructions_key(network: NetworkId, id: u32) -> Vec<u8> {
Self::substrate_key(b"batch", (network, id).encode())
}
pub fn batch_instructions_hash<G: Get>(
getter: &G,
network: NetworkId,
id: u32,
) -> Option<[u8; 32]> {
getter.get(Self::batch_instructions_key(network, id)).map(|bytes| bytes.try_into().unwrap())
}
pub fn save_batch_instructions_hash(
txn: &mut D::Transaction<'_>,
network: NetworkId,
id: u32,
hash: [u8; 32],
) {
txn.put(Self::batch_instructions_key(network, id), hash);
}
}
13 changes: 10 additions & 3 deletions coordinator/src/substrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ async fn handle_key_gen<D: Db, Pro: Processors>(
Ok(())
}

async fn handle_batch_and_burns<Pro: Processors>(
async fn handle_batch_and_burns<D: Db, Pro: Processors>(
db: &mut D,
processors: &Pro,
serai: &Serai,
block: &Block,
Expand All @@ -149,9 +150,15 @@ async fn handle_batch_and_burns<Pro: Processors>(
let mut burns = HashMap::new();

for batch in serai.get_batch_events(hash).await? {
if let InInstructionsEvent::Batch { network, id, block: network_block } = batch {
if let InInstructionsEvent::Batch { network, id, block: network_block, instructions_hash } =
batch
{
network_had_event(&mut burns, &mut batches, network);

let mut txn = db.txn();
SubstrateDb::<D>::save_batch_instructions_hash(&mut txn, network, id, instructions_hash);
txn.commit();

// Make sure this is the only Batch event for this network in this Block
assert!(batch_block.insert(network, network_block).is_none());

Expand Down Expand Up @@ -277,7 +284,7 @@ async fn handle_block<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Proces
// This does break the uniqueness of (hash, event_id) -> one event, yet
// (network, (hash, event_id)) remains valid as a unique ID for an event
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
handle_batch_and_burns(processors, serai, &block).await?;
handle_batch_and_burns(&mut db.0, processors, serai, &block).await?;
}
let mut txn = db.0.txn();
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
Expand Down
17 changes: 11 additions & 6 deletions docs/coordinator/Coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,28 @@ node.
This document primarily details its flow with regards to the Serai node and
processor.

## New Set Event
### New Set Event

On `validator_sets::pallet::Event::NewSet`, the coordinator spawns a tributary
for the new set. It additionally sends the processor
`key_gen::CoordinatorMessage::GenerateKey`.

## Key Generation Event
### Key Generation Event

On `validator_sets::pallet::Event::KeyGen`, the coordinator sends
`substrate::CoordinatorMessage::ConfirmKeyPair` to the processor.

# Update
### Batch

On `key_gen::ProcessorMessage::Update`, the coordinator publishes an unsigned
transaction containing the signed batch to the Serai blockchain.
On `substrate::ProcessorMessage::Batch`, the coordinator notes what the on-chain
`Batch` should be, for verification once published.

# Sign Completed
### SignedBatch

On `substrate::ProcessorMessage::SignedBatch`, the coordinator publishes an
unsigned transaction containing the signed batch to the Serai blockchain.

### Sign Completed

On `sign::ProcessorMessage::Completed`, the coordinator makes a tributary
transaction containing the transaction hash the signing process was supposedly
Expand Down
Loading

0 comments on commit 0eff3d9

Please sign in to comment.