Skip to content

Commit

Permalink
\#339 addendum
Browse files Browse the repository at this point in the history
  • Loading branch information
kayabaNerve committed Nov 16, 2023
1 parent d25e3d8 commit 369af0f
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 69 deletions.
195 changes: 158 additions & 37 deletions coordinator/src/cosign_evaluator.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
use core::time::Duration;
use std::{
sync::{Arc, Mutex, RwLock},
sync::Arc,
collections::{HashSet, HashMap},
};

use tokio::{sync::mpsc, time::sleep};
use tokio::{
sync::{mpsc, Mutex, RwLock},
time::sleep,
};

use scale::Encode;
use sp_application_crypto::RuntimePublic;
use serai_client::{
primitives::{NETWORKS, NetworkId, Signature},
validator_sets::primitives::{Session, ValidatorSet},
SeraiError, Serai,
SeraiError, TemporalSerai, Serai,
};

use serai_db::{DbTxn, Db};
use serai_db::{Get, DbTxn, Db, create_db};

use processor_messages::coordinator::cosign_block_msg;

Expand All @@ -23,6 +26,13 @@ use crate::{
substrate::SubstrateDb,
};

create_db! {
CosignDb {
ReceivedCosign: (set: ValidatorSet, block: [u8; 32]) -> Vec<u8>,
DistinctChain: (set: ValidatorSet) -> ()
}
}

pub struct CosignEvaluator<D: Db> {
db: Mutex<D>,
serai: Arc<Serai>,
Expand All @@ -31,14 +41,14 @@ pub struct CosignEvaluator<D: Db> {
}

impl<D: Db> CosignEvaluator<D> {
fn update_latest_cosign(&self) {
let stakes_lock = self.stakes.read().unwrap();
async fn update_latest_cosign(&self) {
let stakes_lock = self.stakes.read().await;
// If we haven't gotten the stake data yet, return
let Some(stakes) = stakes_lock.as_ref() else { return };

let total_stake = stakes.values().cloned().sum::<u64>();

let latest_cosigns = self.latest_cosigns.read().unwrap();
let latest_cosigns = self.latest_cosigns.read().await;
let mut highest_block = 0;
for (block_num, _) in latest_cosigns.values() {
let mut networks = HashSet::new();
Expand All @@ -55,7 +65,7 @@ impl<D: Db> CosignEvaluator<D> {
}
}

let mut db_lock = self.db.lock().unwrap();
let mut db_lock = self.db.lock().await;
let mut txn = db_lock.txn();
if highest_block > SubstrateDb::<D>::latest_cosigned_block(&txn) {
log::info!("setting latest cosigned block to {}", highest_block);
Expand Down Expand Up @@ -85,61 +95,172 @@ impl<D: Db> CosignEvaluator<D> {
}

// Since we've successfully built stakes, set it
*self.stakes.write().unwrap() = Some(stakes);
*self.stakes.write().await = Some(stakes);

self.update_latest_cosign();
self.update_latest_cosign().await;

Ok(())
}

// Uses Err to signify a message should be retried
async fn handle_new_cosign(&self, cosign: CosignedBlock) -> Result<(), SeraiError> {
let Some(block) = self.serai.block(cosign.block).await? else {
log::warn!("received cosign for an unknown block");
// If we already have this cosign or a newer cosign, return
if let Some(latest) = self.latest_cosigns.read().await.get(&cosign.network) {
if latest.0 >= cosign.block_number {
return Ok(());
}
}

// If this an old cosign (older than a day), drop it
let latest_block = self.serai.latest_block().await?;
if (cosign.block_number + (24 * 60 * 60 / 6)) < latest_block.number() {
log::debug!("received old cosign supposedly signed by {:?}", cosign.network);
return Ok(());
};
}

// If this an old cosign, don't bother handling it
if block.number() <
self.latest_cosigns.read().unwrap().get(&cosign.network).map(|cosign| cosign.0).unwrap_or(0)
{
log::debug!("received old cosign from {:?}", cosign.network);
let Some(block) = self.serai.block_by_number(cosign.block_number).await? else {
log::warn!("received cosign with a block number which doesn't map to a block");
return Ok(());
};

async fn set_with_keys_fn(
serai: &TemporalSerai<'_>,
network: NetworkId,
) -> Result<Option<ValidatorSet>, SeraiError> {
let Some(latest_session) = serai.validator_sets().session(network).await? else {
log::warn!("received cosign from {:?}, which doesn't yet have a session", network);
return Ok(None);
};
let prior_session = Session(latest_session.0.saturating_sub(1));
Ok(Some(
if serai
.validator_sets()
.keys(ValidatorSet { network, session: prior_session })
.await?
.is_some()
{
ValidatorSet { network, session: prior_session }
} else {
ValidatorSet { network, session: latest_session }
},
))
}

// Get the key for this network as of the prior block
// If we have two chains, this value may be different across chains depending on if one chain
// included the set_keys and one didn't
// Because set_keys will force a cosign, it will force detection of distinct blocks
// re: set_keys using keys prior to set_keys (assumed amenable to all)
let serai = self.serai.as_of(block.header().parent_hash.into());

let Some(latest_session) = serai.validator_sets().session(cosign.network).await? else {
log::warn!("received cosign from {:?}, which doesn't yet have a session", cosign.network);
let Some(set_with_keys) = set_with_keys_fn(&serai, cosign.network).await? else {
return Ok(());
};
let prior_session = Session(latest_session.0.saturating_sub(1));
let set_with_keys = if serai
.validator_sets()
.keys(ValidatorSet { network: cosign.network, session: prior_session })
.await?
.is_some()
{
ValidatorSet { network: cosign.network, session: prior_session }
} else {
ValidatorSet { network: cosign.network, session: latest_session }
};

let Some(keys) = serai.validator_sets().keys(set_with_keys).await? else {
log::warn!("received cosign for a block we didn't have keys for");
return Ok(());
};

if !keys.0.verify(&cosign_block_msg(cosign.block), &Signature(cosign.signature)) {
if !keys
.0
.verify(&cosign_block_msg(cosign.block_number, cosign.block), &Signature(cosign.signature))
{
log::warn!("received cosigned block with an invalid signature");
return Ok(());
}

log::info!("received cosign for block {} by {:?}", block.number(), cosign.network);
self.latest_cosigns.write().unwrap().insert(cosign.network, (block.number(), cosign));
log::info!(
"received cosign for block {} ({}) by {:?}",
block.number(),
hex::encode(cosign.block),
cosign.network
);

self.update_latest_cosign();
// Save this cosign to the DB
{
let mut db = self.db.lock().await;
let mut txn = db.txn();
ReceivedCosign::set(&mut txn, set_with_keys, cosign.block, &cosign.encode());
txn.commit();
}

if cosign.block != block.hash() {
log::error!(
"received cosign for a distinct block at {}. we have {}. cosign had {}",
cosign.block_number,
hex::encode(block.hash()),
hex::encode(cosign.block)
);

let serai = self.serai.as_of(latest_block.hash());

let mut db = self.db.lock().await;
// Save this set as being on a different chain
let mut txn = db.txn();
DistinctChain::set(&mut txn, set_with_keys, &());
txn.commit();

let mut total_stake = 0;
let mut total_on_distinct_chain = 0;
for network in NETWORKS {
if network == NetworkId::Serai {
continue;
}

// Get the current set for this network
let set_with_keys = {
let mut res;
while {
res = set_with_keys_fn(&serai, cosign.network).await;
res.is_err()
} {
log::error!(
"couldn't get the set with keys when checking for a distinct chain: {:?}",
res
);
tokio::time::sleep(core::time::Duration::from_secs(3)).await;
}
res.unwrap()
};

// Get its stake
// Doesn't use the stakes inside self to prevent deadlocks re: multi-lock acquisition
if let Some(set_with_keys) = set_with_keys {
let stake = {
let mut res;
while {
res = serai.validator_sets().total_allocated_stake(set_with_keys.network).await;
res.is_err()
} {
log::error!(
"couldn't get total allocated stake when checking for a distinct chain: {:?}",
res
);
tokio::time::sleep(core::time::Duration::from_secs(3)).await;
}
res.unwrap()
};

if let Some(stake) = stake {
total_stake += stake.0;

if DistinctChain::get(&*db, set_with_keys).is_some() {
total_on_distinct_chain += stake.0;
}
}
}
}

// See https://github.com/serai-dex/serai/issues/339 for the reasoning on 17%
if (total_stake * 17 / 100) <= total_on_distinct_chain {
panic!("17% of validator sets (by stake) have co-signed a distinct chain");
}
} else {
let mut latest_cosigns = self.latest_cosigns.write().await;

latest_cosigns.insert(cosign.network, (block.number(), cosign));
self.update_latest_cosign().await;
}

Ok(())
}
Expand Down Expand Up @@ -191,7 +312,7 @@ impl<D: Db> CosignEvaluator<D> {
let cosigns = evaluator
.latest_cosigns
.read()
.unwrap()
.await
.values()
.map(|cosign| cosign.1)
.collect::<Vec<_>>();
Expand Down
3 changes: 2 additions & 1 deletion coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,10 @@ async fn handle_processor_message<D: Db, P: P2p>(
coordinator::ProcessorMessage::SubstrateShare { id, .. } => {
Some(SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
}
coordinator::ProcessorMessage::CosignedBlock { block, signature } => {
coordinator::ProcessorMessage::CosignedBlock { block_number, block, signature } => {
let cosigned_block = CosignedBlock {
network,
block_number: *block_number,
block: *block,
signature: {
let mut arr = [0; 64];
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const LIBP2P_TOPIC: &str = "serai-coordinator";
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
pub struct CosignedBlock {
pub network: NetworkId,
pub block_number: u64,
pub block: [u8; 32],
pub signature: [u8; 64],
}
Expand Down
12 changes: 11 additions & 1 deletion coordinator/src/substrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tokio::{sync::mpsc, time::sleep};
use crate::{
Db,
processors::Processors,
tributary::{TributarySpec, TributaryDb},
tributary::{TributarySpec, SeraiBlockNumber, TributaryDb},
};

mod db;
Expand Down Expand Up @@ -456,6 +456,16 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
let maximally_latent_cosign_block =
skipped_block.map(|skipped_block| skipped_block + COSIGN_DISTANCE);
for block in (last_intended_to_cosign_block + 1) ..= latest_number {
SeraiBlockNumber::set(
&mut txn,
serai
.block_by_number(block)
.await?
.expect("couldn't get block which should've been finalized")
.hash(),
&block,
);

let mut set = false;

let block_has_events = block_has_events(&mut txn, serai, block).await?;
Expand Down
6 changes: 6 additions & 0 deletions coordinator/src/tributary/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ pub use serai_db::*;

use crate::tributary::TributarySpec;

create_db! {
NewTributaryDb {
SeraiBlockNumber: (hash: [u8; 32]) -> u64
}
}

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum Topic {
Dkg,
Expand Down
6 changes: 4 additions & 2 deletions coordinator/src/tributary/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use serai_db::{Get, Db};
use crate::{
processors::Processors,
tributary::{
Transaction, TributarySpec, Topic, DataSpecification, TributaryDb, DataSet, Accumulation,
TributaryState,
Transaction, TributarySpec, SeraiBlockNumber, Topic, DataSpecification, TributaryDb, DataSet,
Accumulation, TributaryState,
nonce_decider::NonceDecider,
dkg_confirmer::DkgConfirmer,
scanner::{RecognizedIdType, RIDTrait},
Expand Down Expand Up @@ -528,6 +528,8 @@ pub(crate) async fn handle_application_tx<
id: SubstrateSignableId::CosigningSubstrateBlock(hash),
attempt: 0,
},
block_number: SeraiBlockNumber::get(txn, hash)
.expect("CosignSubstrateBlock yet didn't save Serai block number"),
},
)
.await;
Expand Down
Loading

0 comments on commit 369af0f

Please sign in to comment.