Skip to content

Commit

Permalink
Reattempts (#483)
Browse files Browse the repository at this point in the history
* Schedule re-attempts and add a (not filled out) match statement to actually execute them

A comment explains the methodology. To copy it here:

"""
This is because we *always* re-attempt any protocol which had participation. That doesn't
mean we *should* re-attempt this protocol.

The alternatives were:
1) Note on-chain we completed a protocol, halting re-attempts upon 34%.
2) Vote on-chain to re-attempt a protocol.

This schema doesn't have any additional messages upon the success case (whereas
alternative #1 does) and doesn't have overhead (as alternative #2 does, sending votes and
then preprocesses. This only sends preprocesses).
"""

Any signing protocol which reaches sufficient participation will be
re-attempted until it no longer does.

* Have the Substrate scanner track DKG removals/completions for the Tributary code

* Don't keep trying to publish a participant removal if we've already set keys

* Pad out the re-attempt match a bit more

* Have CosignEvaluator reload from the DB

* Correctly schedule cosign re-attempts

* Actuall spawn new DKG removal attempts

* Use u32 for Batch ID in SubstrateSignableId, finish Batch re-attempt routing

The batch ID was an opaque [u8; 5] which also included the network, yet that's
redundant and unhelpful.

* Clarify a pair of TODOs in the coordinator

* Remove old TODO

* Final comment cleanup

* Correct usage of TARGET_BLOCK_TIME in reattempt scheduler

It's in ms and I assumed it was in s.

* Have coordinator tests drop BatchReattempts which aren't relevant yet may exist

* Bug fix and pointless oddity removal

We scheduled a re-attempt upon receiving 2/3rds of preprocesses and upon
receiving 2/3rds of shares, so any signing protocol could cause two re-attempts
(not one more).

The coordinator tests randomly generated the Batch ID since it was prior an
opaque byte array. While that didn't break the test, it was pointless and did
make the already-succeeded check before re-attempting impossible to hit.

* Add log statements, correct dead-lock in coordinator tests

* Increase pessimistic timeout on recv_message to compensate for tighter best-case timeouts

* Further bump timeout by a minute

AFAICT, GH failed by just a few seconds.

This also is worst-case in a single instance, making it fine to be decently long.

* Further further bump timeout due to lack of distinct error
  • Loading branch information
kayabaNerve authored Dec 12, 2023
1 parent b297b79 commit 6a17282
Show file tree
Hide file tree
Showing 17 changed files with 437 additions and 191 deletions.
43 changes: 24 additions & 19 deletions coordinator/src/cosign_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::{
time::sleep,
};

use scale::Encode;
use borsh::BorshSerialize;
use sp_application_crypto::RuntimePublic;
use serai_client::{
primitives::{NETWORKS, NetworkId, Signature},
Expand All @@ -28,7 +28,8 @@ use crate::{

create_db! {
CosignDb {
ReceivedCosign: (set: ValidatorSet, block: [u8; 32]) -> Vec<u8>,
ReceivedCosign: (set: ValidatorSet, block: [u8; 32]) -> CosignedBlock,
LatestCosign: (network: NetworkId) -> CosignedBlock,
DistinctChain: (set: ValidatorSet) -> (),
}
}
Expand All @@ -37,7 +38,7 @@ pub struct CosignEvaluator<D: Db> {
db: Mutex<D>,
serai: Arc<Serai>,
stakes: RwLock<Option<HashMap<NetworkId, u64>>>,
latest_cosigns: RwLock<HashMap<NetworkId, (u64, CosignedBlock)>>,
latest_cosigns: RwLock<HashMap<NetworkId, CosignedBlock>>,
}

impl<D: Db> CosignEvaluator<D> {
Expand All @@ -50,18 +51,18 @@ impl<D: Db> CosignEvaluator<D> {

let latest_cosigns = self.latest_cosigns.read().await;
let mut highest_block = 0;
for (block_num, _) in latest_cosigns.values() {
for cosign in latest_cosigns.values() {
let mut networks = HashSet::new();
for (network, (sub_block_num, _)) in &*latest_cosigns {
if sub_block_num >= block_num {
for (network, sub_cosign) in &*latest_cosigns {
if sub_cosign.block_number >= cosign.block_number {
networks.insert(network);
}
}
let sum_stake =
networks.into_iter().map(|network| stakes.get(network).unwrap_or(&0)).sum::<u64>();
let needed_stake = ((total_stake * 2) / 3) + 1;
if (total_stake == 0) || (sum_stake > needed_stake) {
highest_block = highest_block.max(*block_num);
highest_block = highest_block.max(cosign.block_number);
}
}

Expand Down Expand Up @@ -106,7 +107,7 @@ impl<D: Db> CosignEvaluator<D> {
async fn handle_new_cosign(&self, cosign: CosignedBlock) -> Result<(), SeraiError> {
// If we already have this cosign or a newer cosign, return
if let Some(latest) = self.latest_cosigns.read().await.get(&cosign.network) {
if latest.0 >= cosign.block_number {
if latest.block_number >= cosign.block_number {
return Ok(());
}
}
Expand Down Expand Up @@ -180,7 +181,8 @@ impl<D: Db> CosignEvaluator<D> {
{
let mut db = self.db.lock().await;
let mut txn = db.txn();
ReceivedCosign::set(&mut txn, set_with_keys, cosign.block, &cosign.encode());
ReceivedCosign::set(&mut txn, set_with_keys, cosign.block, &cosign);
LatestCosign::set(&mut txn, set_with_keys.network, &(cosign));
txn.commit();
}

Expand Down Expand Up @@ -258,7 +260,7 @@ impl<D: Db> CosignEvaluator<D> {
} else {
{
let mut latest_cosigns = self.latest_cosigns.write().await;
latest_cosigns.insert(cosign.network, (block.number(), cosign));
latest_cosigns.insert(cosign.network, cosign);
}
self.update_latest_cosign().await;
}
Expand All @@ -268,11 +270,18 @@ impl<D: Db> CosignEvaluator<D> {

#[allow(clippy::new_ret_no_self)]
pub fn new<P: P2p>(db: D, p2p: P, serai: Arc<Serai>) -> mpsc::UnboundedSender<CosignedBlock> {
let mut latest_cosigns = HashMap::new();
for network in NETWORKS {
if let Some(cosign) = LatestCosign::get(&db, network) {
latest_cosigns.insert(network, cosign);
}
}

let evaluator = Arc::new(Self {
db: Mutex::new(db),
serai,
stakes: RwLock::new(None),
latest_cosigns: RwLock::new(HashMap::new()),
latest_cosigns: RwLock::new(latest_cosigns),
});

// Spawn a task to update stakes regularly
Expand Down Expand Up @@ -310,15 +319,11 @@ impl<D: Db> CosignEvaluator<D> {
tokio::spawn({
async move {
loop {
let cosigns = evaluator
.latest_cosigns
.read()
.await
.values()
.map(|cosign| cosign.1)
.collect::<Vec<_>>();
let cosigns = evaluator.latest_cosigns.read().await.values().cloned().collect::<Vec<_>>();
for cosign in cosigns {
P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, cosign.encode()).await;
let mut buf = vec![];
cosign.serialize(&mut buf).unwrap();
P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, buf).await;
}
sleep(Duration::from_secs(60)).await;
}
Expand Down
9 changes: 6 additions & 3 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use frost::Participant;
use serai_db::{DbTxn, Db};

use scale::Encode;
use borsh::BorshSerialize;
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet, KeyPair},
Expand Down Expand Up @@ -248,7 +249,9 @@ async fn handle_processor_message<D: Db, P: P2p>(
},
};
cosign_channel.send(cosigned_block).unwrap();
P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, cosigned_block.encode()).await;
let mut buf = vec![];
cosigned_block.serialize(&mut buf).unwrap();
P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, buf).await;
None
}
},
Expand Down Expand Up @@ -555,7 +558,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
let SubstrateSignableId::Batch(id) = id.id else {
panic!("BatchPreprocess SubstrateSignableId wasn't Batch")
};
id.encode()
id.to_le_bytes()
},
preprocesses.into_iter().map(Into::into).collect(),
);
Expand Down Expand Up @@ -1057,7 +1060,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
let mut tx = match id_type {
RecognizedIdType::Batch => Transaction::SubstrateSign(SignData {
data: get_preprocess(&raw_db, id_type, &id).await,
plan: SubstrateSignableId::Batch(id.as_slice().try_into().unwrap()),
plan: SubstrateSignableId::Batch(u32::from_le_bytes(id.try_into().unwrap())),
label: Label::Preprocess,
attempt: 0,
signed: Transaction::empty_signed(),
Expand Down
7 changes: 3 additions & 4 deletions coordinator/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use async_trait::async_trait;

use scale::{Encode, Decode};
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::primitives::NetworkId;

use serai_db::Db;
Expand Down Expand Up @@ -39,7 +39,7 @@ use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};

const LIBP2P_TOPIC: &str = "serai-coordinator";

#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)]
pub struct CosignedBlock {
pub network: NetworkId,
pub block_number: u64,
Expand Down Expand Up @@ -705,8 +705,7 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
}
}
P2pMessageKind::CosignedBlock => {
let mut msg_ref: &[u8] = msg.msg.as_ref();
let Ok(msg) = CosignedBlock::decode(&mut scale::IoReader(&mut msg_ref)) else {
let Ok(msg) = CosignedBlock::deserialize_reader(&mut msg.msg.as_slice()) else {
log::error!("received CosignedBlock message with invalidly serialized contents");
continue;
};
Expand Down
79 changes: 42 additions & 37 deletions coordinator/src/substrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use serai_client::{
SeraiError, Block, Serai, TemporalSerai,
primitives::{BlockHash, NetworkId},
validator_sets::{
primitives::{ValidatorSet, KeyPair, amortize_excess_key_shares},
primitives::{ValidatorSet, amortize_excess_key_shares},
ValidatorSetsEvent,
},
in_instructions::InInstructionsEvent,
Expand All @@ -25,7 +25,11 @@ use processor_messages::SubstrateContext;

use tokio::{sync::mpsc, time::sleep};

use crate::{Db, processors::Processors, tributary::TributarySpec};
use crate::{
Db,
processors::Processors,
tributary::{TributarySpec, SeraiDkgRemoval, SeraiDkgCompleted},
};

mod db;
pub use db::*;
Expand Down Expand Up @@ -114,37 +118,6 @@ async fn handle_new_set<D: Db>(
Ok(())
}

async fn handle_key_gen<Pro: Processors>(
processors: &Pro,
serai: &Serai,
block: &Block,
set: ValidatorSet,
key_pair: KeyPair,
) -> Result<(), SeraiError> {
processors
.send(
set.network,
processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context: SubstrateContext {
serai_time: block.time().unwrap() / 1000,
network_latest_finalized_block: serai
.as_of(block.hash())
.in_instructions()
.latest_block_for_network(set.network)
.await?
// The processor treats this as a magic value which will cause it to find a network
// block which has a time greater than or equal to the Serai time
.unwrap_or(BlockHash([0; 32])),
},
session: set.session,
key_pair,
},
)
.await;

Ok(())
}

async fn handle_batch_and_burns<Pro: Processors>(
txn: &mut impl DbTxn,
processors: &Pro,
Expand Down Expand Up @@ -249,6 +222,19 @@ async fn handle_block<D: Db, Pro: Processors>(
// Define an indexed event ID.
let mut event_id = 0;

if HandledEvent::is_unhandled(db, hash, event_id) {
let mut txn = db.txn();
for removal in serai.as_of(hash).validator_sets().participant_removed_events().await? {
let ValidatorSetsEvent::ParticipantRemoved { set, removed } = removal else {
panic!("ParticipantRemoved event wasn't ParticipantRemoved: {removal:?}");
};
SeraiDkgRemoval::set(&mut txn, set, removed.0, &());
}
HandledEvent::handle_event(&mut txn, hash, event_id);
txn.commit();
}
event_id += 1;

// If a new validator set was activated, create tributary/inform processor to do a DKG
for new_set in serai.as_of(hash).validator_sets().new_set_events().await? {
// Individually mark each event as handled so on reboot, we minimize duplicates
Expand Down Expand Up @@ -279,12 +265,31 @@ async fn handle_block<D: Db, Pro: Processors>(
for key_gen in serai.as_of(hash).validator_sets().key_gen_events().await? {
if HandledEvent::is_unhandled(db, hash, event_id) {
log::info!("found fresh key gen event {:?}", key_gen);
if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen {
handle_key_gen(processors, serai, &block, set, key_pair).await?;
} else {
let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen else {
panic!("KeyGen event wasn't KeyGen: {key_gen:?}");
}
};
processors
.send(
set.network,
processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context: SubstrateContext {
serai_time: block.time().unwrap() / 1000,
network_latest_finalized_block: serai
.as_of(block.hash())
.in_instructions()
.latest_block_for_network(set.network)
.await?
// The processor treats this as a magic value which will cause it to find a network
// block which has a time greater than or equal to the Serai time
.unwrap_or(BlockHash([0; 32])),
},
session: set.session,
key_pair,
},
)
.await;
let mut txn = db.txn();
SeraiDkgCompleted::set(&mut txn, set, &());
HandledEvent::handle_event(&mut txn, hash, event_id);
txn.commit();
}
Expand Down
13 changes: 5 additions & 8 deletions coordinator/src/tests/tributary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,27 +223,24 @@ fn serialize_transaction() {
{
let mut block = [0; 32];
OsRng.fill_bytes(&mut block);
let mut batch = [0; 5];
OsRng.fill_bytes(&mut batch);
let batch = u32::try_from(OsRng.next_u64() >> 32).unwrap();
test_read_write(Transaction::Batch { block, batch });
}
test_read_write(Transaction::SubstrateBlock(OsRng.next_u64()));

{
let mut plan = [0; 5];
OsRng.fill_bytes(&mut plan);
let batch = u32::try_from(OsRng.next_u64() >> 32).unwrap();
test_read_write(Transaction::SubstrateSign(random_sign_data(
&mut OsRng,
SubstrateSignableId::Batch(plan),
SubstrateSignableId::Batch(batch),
Label::Preprocess,
)));
}
{
let mut plan = [0; 5];
OsRng.fill_bytes(&mut plan);
let batch = u32::try_from(OsRng.next_u64() >> 32).unwrap();
test_read_write(Transaction::SubstrateSign(random_sign_data(
&mut OsRng,
SubstrateSignableId::Batch(plan),
SubstrateSignableId::Batch(batch),
Label::Share,
)));
}
Expand Down
Loading

0 comments on commit 6a17282

Please sign in to comment.