diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index e52b7fdc8..41ffd527b 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -175,9 +175,8 @@ pub async fn scan_substrate( // Save it to the database MainDb::new(db).add_active_tributary(&spec); - // Add it to the queue - // If we reboot before this is read from the queue, the fact it was saved to the database - // means it'll be handled on reboot + // If we reboot before this is read, the fact it was saved to the database means it'll be + // handled on reboot new_tributary_spec.send(spec).unwrap(); }, &processors, @@ -416,8 +415,8 @@ pub async fn handle_p2p( // Have up to three nodes respond let responders = u64::from(tributary.spec.n().min(3)); - // Decide which nodes will respond by using the latest block's hash as a mutually agreed - // upon entropy source + // Decide which nodes will respond by using the latest block's hash as a mutually + // agreed upon entropy source // This isn't a secure source of entropy, yet it's fine for this let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); // If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9) @@ -502,7 +501,7 @@ pub async fn publish_signed_transaction( } pub async fn handle_processors( - mut db: D, + db: D, key: Zeroizing<::F>, serai: Arc, mut processors: Pro, @@ -510,318 +509,364 @@ pub async fn handle_processors( ) { let pub_key = Ristretto::generator() * key.deref(); - let mut tributaries = HashMap::new(); - loop { - while let Ok(tributary) = { - match new_tributary.try_recv() { - Ok(tributary) => Ok(tributary), - Err(broadcast::error::TryRecvError::Empty) => Err(()), - Err(broadcast::error::TryRecvError::Lagged(_)) => { - panic!("handle_processors lagged to handle new_tributary") - } - Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), - } - } { - tributaries.insert(tributary.spec.genesis(), tributary); - } - - // TODO: Dispatch this message to a task dedicated to handling this processor, preventing one - // processor from holding up all the others. This would require a peek method be added to the - // message-queue (to view multiple future messages at once) - // TODO: Do we handle having handled a message, by DB, yet having rebooted before `ack`ing it? - // Does the processor? - let msg = processors.recv().await; - - // TODO2: This is slow, and only works as long as a network only has a single Tributary - // (which means there's a lack of multisig rotation) - let spec = { - let mut spec = None; - for tributary in tributaries.values() { - if tributary.spec.set().network == msg.network { - spec = Some(tributary.spec.clone()); - break; - } - } - spec.expect("received message from processor we don't have a tributary for") - }; - - let genesis = spec.genesis(); - // TODO: We probably want to NOP here, not panic? - let my_i = spec.i(pub_key).expect("processor message for network we aren't a validator in"); - - let tx = match msg.msg.clone() { - ProcessorMessage::KeyGen(inner_msg) => match inner_msg { - key_gen::ProcessorMessage::Commitments { id, commitments } => { - Some(Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed())) - } - key_gen::ProcessorMessage::Shares { id, mut shares } => { - // Create a MuSig-based machine to inform Substrate of this key generation - let nonces = crate::tributary::dkg_confirmation_nonces(&key, &spec, id.attempt); - - let mut tx_shares = Vec::with_capacity(shares.len()); - for i in 1 ..= spec.n() { - let i = Participant::new(i).unwrap(); - if i == my_i { - continue; - } - tx_shares - .push(shares.remove(&i).expect("processor didn't send share for another validator")); - } + let channels = Arc::new(RwLock::new(HashMap::new())); + tokio::spawn({ + let processors = processors.clone(); + let channels = channels.clone(); + async move { + loop { + let channels = channels.clone(); + let ActiveTributary { spec, tributary } = new_tributary.recv().await.unwrap(); + let genesis = spec.genesis(); + tokio::spawn({ + let mut db = db.clone(); + let key = key.clone(); + let serai = serai.clone(); + let mut processors = processors.clone(); + async move { + let (send, mut recv) = mpsc::unbounded_channel(); + // TODO: Support multisig rotation (not per-Tributary yet per-network?) + channels.write().await.insert(spec.set().network, send); - Some(Transaction::DkgShares { - attempt: id.attempt, - shares: tx_shares, - confirmation_nonces: nonces, - signed: Transaction::empty_signed(), - }) - } - key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, network_key } => { - assert_eq!( - id.set.network, msg.network, - "processor claimed to be a different network than it was for GeneratedKeyPair", - ); - // TODO: Also check the other KeyGenId fields - - // Tell the Tributary the key pair, get back the share for the MuSig signature - let mut txn = db.txn(); - let share = crate::tributary::generated_key_pair::( - &mut txn, - &key, - &spec, - &(Public(substrate_key), network_key.try_into().unwrap()), - id.attempt, - ); - txn.commit(); - - match share { - Ok(share) => { - Some(Transaction::DkgConfirmed(id.attempt, share, Transaction::empty_signed())) - } - Err(p) => todo!("participant {p:?} sent invalid DKG confirmation preprocesses"), - } - } - }, - ProcessorMessage::Sign(msg) => match msg { - sign::ProcessorMessage::Preprocess { id, preprocess } => { - if id.attempt == 0 { - let mut txn = db.txn(); - MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); - txn.commit(); - - None - } else { - Some(Transaction::SignPreprocess(SignData { - plan: id.id, - attempt: id.attempt, - data: preprocess, - signed: Transaction::empty_signed(), - })) - } - } - sign::ProcessorMessage::Share { id, share } => Some(Transaction::SignShare(SignData { - plan: id.id, - attempt: id.attempt, - data: share, - signed: Transaction::empty_signed(), - })), - sign::ProcessorMessage::Completed { key: _, id, tx } => { - let r = Zeroizing::new(::F::random(&mut OsRng)); - #[allow(non_snake_case)] - let R = ::generator() * r.deref(); - let mut tx = Transaction::SignCompleted { - plan: id, - tx_hash: tx, - first_signer: pub_key, - signature: SchnorrSignature { R, s: ::F::ZERO }, - }; - let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge()); - match &mut tx { - Transaction::SignCompleted { signature, .. } => { - *signature = signed; - } - _ => unreachable!(), - } - Some(tx) - } - }, - ProcessorMessage::Coordinator(inner_msg) => match inner_msg { - coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => { - assert_eq!( - network, msg.network, - "processor claimed to be a different network than it was for SubstrateBlockAck", - ); - - // Safe to use its own txn since this is static and just needs to be written before we - // provide SubstrateBlock - let mut txn = db.txn(); - // TODO: This needs to be scoped per multisig - TributaryDb::::set_plan_ids(&mut txn, genesis, block, &plans); - txn.commit(); - - Some(Transaction::SubstrateBlock(block)) - } - coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => { - log::info!( - "informed of batch (sign ID {}, attempt {}) for block {}", - hex::encode(id.id), - id.attempt, - hex::encode(block), - ); - // If this is the first attempt instance, wait until we synchronize around the batch - // first - if id.attempt == 0 { - // Save the preprocess to disk so we can publish it later - // This is fine to use its own TX since it's static and just needs to be written - // before this message finishes it handling (or with this message's finished handling) - let mut txn = db.txn(); - MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); - txn.commit(); - - Some(Transaction::Batch(block.0, id.id)) - } else { - Some(Transaction::BatchPreprocess(SignData { - plan: id.id, - attempt: id.attempt, - data: preprocess, - signed: Transaction::empty_signed(), - })) - } - } - coordinator::ProcessorMessage::BatchShare { id, share } => { - Some(Transaction::BatchShare(SignData { - plan: id.id, - attempt: id.attempt, - data: share.to_vec(), - signed: Transaction::empty_signed(), - })) - } - }, - ProcessorMessage::Substrate(inner_msg) => match inner_msg { - processor_messages::substrate::ProcessorMessage::Update { batch } => { - assert_eq!( - batch.batch.network, msg.network, - "processor sent us a batch for a different network than it was for", - ); - // TODO: Check this key's key pair's substrate key is authorized to publish batches - - // Save this batch to the disk - MainDb::new(&mut db).save_batch(batch); - - /* - Use a dedicated task to publish batches due to the latency potentially incurred. - - This does not guarantee the batch has actually been published when the message is - `ack`ed to message-queue. Accordingly, if we reboot, these batches would be dropped - (as we wouldn't see the `Update` again, triggering our re-attempt to publish). - - The solution to this is to have the task try not to publish the batch which caused it - to be spawned, yet all saved batches which have yet to published. This does risk having - multiple tasks trying to publish all pending batches, yet these aren't notably complex. - */ - tokio::spawn({ - let mut db = db.clone(); - let serai = serai.clone(); - let network = msg.network; - async move { - // Since we have a new batch, publish all batches yet to be published to Serai - // This handles the edge-case where batch n+1 is signed before batch n is - while let Some(batch) = { - // Get the next-to-execute batch ID - let next = { - let mut first = true; - loop { - if !first { - log::error!( - "couldn't connect to Serai node to get the next batch ID for {network:?}", + loop { + let msg: processors::Message = recv.recv().await.unwrap(); + + // TODO: We probably want to NOP here, not panic? + // TODO: We do have to track produced Batches in order to ensure their integrity + let my_i = + spec.i(pub_key).expect("processor message for network we aren't a validator in"); + + let tx = match msg.msg.clone() { + ProcessorMessage::KeyGen(inner_msg) => match inner_msg { + key_gen::ProcessorMessage::Commitments { id, commitments } => { + Some(Transaction::DkgCommitments( + id.attempt, + commitments, + Transaction::empty_signed(), + )) + } + key_gen::ProcessorMessage::Shares { id, mut shares } => { + // Create a MuSig-based machine to inform Substrate of this key generation + let nonces = crate::tributary::dkg_confirmation_nonces(&key, &spec, id.attempt); + + let mut tx_shares = Vec::with_capacity(shares.len()); + for i in 1 ..= spec.n() { + let i = Participant::new(i).unwrap(); + if i == my_i { + continue; + } + tx_shares.push( + shares + .remove(&i) + .expect("processor didn't send share for another validator"), ); - tokio::time::sleep(Duration::from_secs(5)).await; } - first = false; - let Ok(latest_block) = serai.get_latest_block().await else { continue }; - let Ok(last) = - serai.get_last_batch_for_network(latest_block.hash(), network).await - else { - continue; + Some(Transaction::DkgShares { + attempt: id.attempt, + shares: tx_shares, + confirmation_nonces: nonces, + signed: Transaction::empty_signed(), + }) + } + key_gen::ProcessorMessage::GeneratedKeyPair { + id, + substrate_key, + network_key, + } => { + assert_eq!( + id.set.network, msg.network, + "processor claimed to be a different network than it was for GeneratedKeyPair", + ); + // TODO: Also check the other KeyGenId fields + + // Tell the Tributary the key pair, get back the share for the MuSig signature + let mut txn = db.txn(); + let share = crate::tributary::generated_key_pair::( + &mut txn, + &key, + &spec, + &(Public(substrate_key), network_key.try_into().unwrap()), + id.attempt, + ); + txn.commit(); + + match share { + Ok(share) => Some(Transaction::DkgConfirmed( + id.attempt, + share, + Transaction::empty_signed(), + )), + Err(p) => { + todo!("participant {p:?} sent invalid DKG confirmation preprocesses") + } + } + } + }, + ProcessorMessage::Sign(msg) => match msg { + sign::ProcessorMessage::Preprocess { id, preprocess } => { + if id.attempt == 0 { + let mut txn = db.txn(); + MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); + txn.commit(); + + None + } else { + Some(Transaction::SignPreprocess(SignData { + plan: id.id, + attempt: id.attempt, + data: preprocess, + signed: Transaction::empty_signed(), + })) + } + } + sign::ProcessorMessage::Share { id, share } => { + Some(Transaction::SignShare(SignData { + plan: id.id, + attempt: id.attempt, + data: share, + signed: Transaction::empty_signed(), + })) + } + sign::ProcessorMessage::Completed { key: _, id, tx } => { + let r = Zeroizing::new(::F::random(&mut OsRng)); + #[allow(non_snake_case)] + let R = ::generator() * r.deref(); + let mut tx = Transaction::SignCompleted { + plan: id, + tx_hash: tx, + first_signer: pub_key, + signature: SchnorrSignature { R, s: ::F::ZERO }, }; - break if let Some(last) = last { last + 1 } else { 0 }; + let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge()); + match &mut tx { + Transaction::SignCompleted { signature, .. } => { + *signature = signed; + } + _ => unreachable!(), + } + Some(tx) + } + }, + ProcessorMessage::Coordinator(inner_msg) => match inner_msg { + coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => { + assert_eq!( + network, msg.network, + "processor claimed to be a different network than it was for SubstrateBlockAck", + ); + + // Safe to use its own txn since this is static and just needs to be written + // before we provide SubstrateBlock + let mut txn = db.txn(); + // TODO: This needs to be scoped per multisig + TributaryDb::::set_plan_ids(&mut txn, genesis, block, &plans); + txn.commit(); + + Some(Transaction::SubstrateBlock(block)) + } + coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => { + log::info!( + "informed of batch (sign ID {}, attempt {}) for block {}", + hex::encode(id.id), + id.attempt, + hex::encode(block), + ); + // If this is the first attempt instance, wait until we synchronize around the + // batch first + if id.attempt == 0 { + // Save the preprocess to disk so we can publish it later + // This is fine to use its own TX since it's static and just needs to be + // written before this message finishes it handling (or with this message's + // finished handling) + let mut txn = db.txn(); + MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); + txn.commit(); + + Some(Transaction::Batch(block.0, id.id)) + } else { + Some(Transaction::BatchPreprocess(SignData { + plan: id.id, + attempt: id.attempt, + data: preprocess, + signed: Transaction::empty_signed(), + })) + } + } + coordinator::ProcessorMessage::BatchShare { id, share } => { + Some(Transaction::BatchShare(SignData { + plan: id.id, + attempt: id.attempt, + data: share.to_vec(), + signed: Transaction::empty_signed(), + })) + } + }, + ProcessorMessage::Substrate(inner_msg) => match inner_msg { + processor_messages::substrate::ProcessorMessage::Update { batch } => { + assert_eq!( + batch.batch.network, msg.network, + "processor sent us a batch for a different network than it was for", + ); + // TODO: Check this key's key pair's substrate key is authorized to publish + // batches + + // Save this batch to the disk + MainDb::new(&mut db).save_batch(batch); + + /* + Use a dedicated task to publish batches due to the latency potentially + incurred. + + This does not guarantee the batch has actually been published when the + message is `ack`ed to message-queue. Accordingly, if we reboot, these batches + would be dropped (as we wouldn't see the `Update` again, triggering our + re-attempt to publish). + + The solution to this is to have the task try not to publish the batch which + caused it to be spawned, yet all saved batches which have yet to published. + This does risk having multiple tasks trying to publish all pending batches, + yet these aren't notably complex. + */ + tokio::spawn({ + let mut db = db.clone(); + let serai = serai.clone(); + let network = msg.network; + async move { + // Since we have a new batch, publish all batches yet to be published to + // Serai + // This handles the edge-case where batch n+1 is signed before batch n is + while let Some(batch) = { + // Get the next-to-execute batch ID + let next = { + let mut first = true; + loop { + if !first { + log::error!( + "{} {network:?}", + "couldn't connect to Serai node to get the next batch ID for", + ); + tokio::time::sleep(Duration::from_secs(5)).await; + } + first = false; + + let Ok(latest_block) = serai.get_latest_block().await else { + continue; + }; + let Ok(last) = serai + .get_last_batch_for_network(latest_block.hash(), network) + .await + else { + continue; + }; + break if let Some(last) = last { last + 1 } else { 0 }; + } + }; + + // If we have this batch, attempt to publish it + MainDb::new(&mut db).batch(network, next) + } { + let id = batch.batch.id; + let block = batch.batch.block; + + let tx = Serai::execute_batch(batch); + // This publish may fail if this transactions already exists in the + // mempool, which is possible, or if this batch was already executed + // on-chain + // Either case will have eventual resolution and be handled by the above + // check on if this batch should execute + if serai.publish(&tx).await.is_ok() { + log::info!( + "published batch {network:?} {id} (block {})", + hex::encode(block) + ); + } + } + } + }); + + None + } + }, + }; + + // If this created a transaction, publish it + if let Some(mut tx) = tx { + log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); + + match tx.kind() { + TransactionKind::Provided(_) => { + log::trace!("providing transaction {}", hex::encode(tx.hash())); + let res = tributary.provide_transaction(tx).await; + if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { + panic!("provided an invalid transaction: {res:?}"); + } + } + TransactionKind::Unsigned => { + log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash())); + // Ignores the result since we can't differentiate already in-mempool from + // already on-chain from invalid + // TODO: Don't ignore the result + tributary.add_transaction(tx).await; + } + TransactionKind::Signed(_) => { + log::trace!( + "getting next nonce for Tributary TX in response to processor message" + ); + + let nonce = loop { + let Some(nonce) = NonceDecider::::nonce(&db, genesis, &tx) + .expect("signed TX didn't have nonce") + else { + // This can be None if: + // 1) We scanned the relevant transaction(s) in a Tributary block + // 2) The processor was sent a message and responded + // 3) The Tributary TXN has yet to be committed + log::warn!( + "nonce has yet to be saved for processor-instigated transaction" + ); + sleep(Duration::from_millis(100)).await; + continue; + }; + break nonce; + }; + tx.sign(&mut OsRng, genesis, &key, nonce); + + publish_signed_transaction(&tributary, tx).await; } - }; - - // If we have this batch, attempt to publish it - MainDb::new(&mut db).batch(network, next) - } { - let id = batch.batch.id; - let block = batch.batch.block; - - let tx = Serai::execute_batch(batch); - // This publish may fail if this transactions already exists in the mempool, which - // is possible, or if this batch was already executed on-chain - // Either case will have eventual resolution and be handled by the above check on - // if this block should execute - if serai.publish(&tx).await.is_ok() { - log::info!("published batch {network:?} {id} (block {})", hex::encode(block)); } } - } - }); - None - } - }, - }; - - // If this created a transaction, publish it - if let Some(mut tx) = tx { - log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); - let Some(tributary) = tributaries.get(&genesis) else { - // TODO: This can happen since Substrate tells the Processor to generate commitments - // at the same time it tells the Tributary to be created - // There's no guarantee the Tributary will have been created though - panic!("processor is operating on tributary we don't have"); - }; - let tributary = &tributary.tributary; - - match tx.kind() { - TransactionKind::Provided(_) => { - log::trace!("providing transaction {}", hex::encode(tx.hash())); - let res = tributary.provide_transaction(tx).await; - if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { - panic!("provided an invalid transaction: {res:?}"); + processors.ack(msg).await; + } } - } - TransactionKind::Unsigned => { - log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash())); - // Ignores the result since we can't differentiate already in-mempool from already - // on-chain from invalid - // TODO: Don't ignore the result - tributary.add_transaction(tx).await; - } - TransactionKind::Signed(_) => { - log::trace!("getting next nonce for Tributary TX in response to processor message"); - - let nonce = loop { - let Some(nonce) = - NonceDecider::::nonce(&db, genesis, &tx).expect("signed TX didn't have nonce") - else { - // This can be None if: - // 1) We scanned the relevant transaction(s) in a Tributary block - // 2) The processor was sent a message and responded - // 3) The Tributary TXN has yet to be committed - log::warn!("nonce has yet to be saved for processor-instigated transaction"); - sleep(Duration::from_millis(100)).await; - continue; - }; - break nonce; - }; - tx.sign(&mut OsRng, genesis, &key, nonce); - - publish_signed_transaction(tributary, tx).await; - } + }); } } + }); - processors.ack(msg).await; + let mut last_msg = None; + loop { + // TODO: We dispatch this to an async task per-processor, yet we don't move to the next message + // yet as all processor messages are shoved into a global queue. + // Modify message-queue to offer per-sender queues, not per-receiver. + // Alternatively, a peek method with local delineation of handled messages would work. + + // TODO: Do we handle having handled a message, by DB, yet having rebooted before `ack`ing it? + // Does the processor? + let msg = processors.recv().await; + if last_msg == Some(msg.id) { + sleep(Duration::from_secs(1)).await; + continue; + } + last_msg = Some(msg.id); + + // TODO: Race conditions with above tributary availability? + // TODO: How does this hold up to multisig rotation? + if let Some(channel) = channels.read().await.get(&msg.network) { + channel.send(msg).unwrap(); + } else { + log::warn!("received processor message for network we don't have a channel for"); + } } } @@ -893,7 +938,7 @@ pub async fn run( loop { match new_tributary_listener_1.recv().await { Ok(tributary) => { - tributaries.write().await.insert(tributary.spec.genesis(), tributary); + tributaries.write().await.insert(tributary.spec.genesis(), tributary.tributary); } Err(broadcast::error::RecvError::Lagged(_)) => { panic!("recognized_id lagged to handle new_tributary") @@ -943,9 +988,10 @@ pub async fn run( let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { + // TODO: This may happen if the task above is simply slow panic!("tributary we don't have came to consensus on an Batch"); }; - publish_signed_transaction(&tributary.tributary, tx).await; + publish_signed_transaction(tributary, tx).await; } } };