diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 419330af9..308d629ba 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -28,13 +28,11 @@ use message_queue::{Service, client::MessageQueue}; use futures::stream::StreamExt; use tokio::{ - sync::{RwLock, mpsc}, + sync::{RwLock, mpsc, broadcast}, time::sleep, }; -use ::tributary::{ - ReadWrite, ProvidedError, TransactionKind, TransactionTrait, Block, Tributary, TributaryReader, -}; +use ::tributary::{ReadWrite, ProvidedError, TransactionKind, TransactionTrait, Block, Tributary}; mod tributary; use crate::tributary::{ @@ -57,22 +55,21 @@ mod substrate; #[cfg(test)] pub mod tests; +#[derive(Clone)] pub struct ActiveTributary { pub spec: TributarySpec, - pub tributary: Arc>>, + pub tributary: Arc>, } -type Tributaries = HashMap<[u8; 32], ActiveTributary>; - // Adds a tributary into the specified HashMap async fn add_tributary( db: D, key: Zeroizing<::F>, processors: &Pro, p2p: P, - tributaries: &mut Tributaries, + tributaries: &broadcast::Sender>, spec: TributarySpec, -) -> TributaryReader { +) { log::info!("adding tributary {:?}", spec.set()); let tributary = Tributary::<_, Transaction, _>::new( @@ -110,14 +107,10 @@ async fn add_tributary( ) .await; - let reader = tributary.reader(); - - tributaries.insert( - tributary.genesis(), - ActiveTributary { spec, tributary: Arc::new(RwLock::new(tributary)) }, - ); - - reader + tributaries + .send(ActiveTributary { spec, tributary: Arc::new(tributary) }) + .map_err(|_| "all ActiveTributary recipients closed") + .unwrap(); } pub async fn scan_substrate( @@ -125,7 +118,7 @@ pub async fn scan_substrate( key: Zeroizing<::F>, processors: Pro, serai: Arc, - new_tributary_channel: mpsc::UnboundedSender, + new_tributary_spec: mpsc::UnboundedSender, ) { log::info!("scanning substrate"); @@ -185,7 +178,7 @@ pub async fn scan_substrate( // Add it to the queue // If we reboot before this is read from the queue, the fact it was saved to the database // means it'll be handled on reboot - new_tributary_channel.send(spec).unwrap(); + new_tributary_spec.send(spec).unwrap(); }, &processors, &serai, @@ -202,7 +195,7 @@ pub async fn scan_substrate( } } -#[allow(clippy::too_many_arguments, clippy::type_complexity)] +#[allow(clippy::type_complexity)] pub async fn scan_tributaries< D: Db, Pro: Processors, @@ -216,38 +209,25 @@ pub async fn scan_tributaries< p2p: P, processors: Pro, serai: Arc, - tributaries: Arc>>, - mut new_tributary_channel: mpsc::UnboundedReceiver, + mut new_tributary: broadcast::Receiver>, ) { log::info!("scanning tributaries"); - let mut tributary_readers = vec![]; - for ActiveTributary { spec, tributary } in tributaries.read().await.values() { - tributary_readers.push((spec.clone(), tributary.read().await.reader())); - } - // Handle new Tributary blocks + let mut tributary_readers = vec![]; let mut tributary_db = tributary::TributaryDb::new(raw_db.clone()); loop { - // The following handle_new_blocks function may take an arbitrary amount of time - // Accordingly, it may take a long time to acquire a write lock on the tributaries table - // By definition of new_tributary_channel, we allow tributaries to be 'added' almost - // immediately, meaning the Substrate scanner won't become blocked on this - { - while let Ok(spec) = new_tributary_channel.try_recv() { - let reader = add_tributary( - raw_db.clone(), - key.clone(), - &processors, - p2p.clone(), - // This is a short-lived write acquisition, which is why it should be fine - &mut *tributaries.write().await, - spec.clone(), - ) - .await; - - tributary_readers.push((spec, reader)); + while let Ok(ActiveTributary { spec, tributary }) = { + match new_tributary.try_recv() { + Ok(tributary) => Ok(tributary), + Err(broadcast::error::TryRecvError::Empty) => Err(()), + Err(broadcast::error::TryRecvError::Lagged(_)) => { + panic!("scan_tributaries lagged to handle new_tributary") + } + Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), } + } { + tributary_readers.push((spec, tributary.reader())); } for (spec, reader) in &tributary_readers { @@ -296,15 +276,24 @@ pub async fn scan_tributaries< pub async fn heartbeat_tributaries( p2p: P, - tributaries: Arc>>, + mut new_tributary: broadcast::Receiver>, ) { let ten_blocks_of_time = Duration::from_secs((10 * Tributary::::block_time()).into()); + let mut readers = vec![]; loop { - let mut readers = vec![]; - for tributary in tributaries.read().await.values() { - readers.push(tributary.tributary.read().await.reader()); + while let Ok(ActiveTributary { spec, tributary }) = { + match new_tributary.try_recv() { + Ok(tributary) => Ok(tributary), + Err(broadcast::error::TryRecvError::Empty) => Err(()), + Err(broadcast::error::TryRecvError::Lagged(_)) => { + panic!("heartbeat lagged to handle new_tributary") + } + Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), + } + } { + readers.push(tributary.reader()); } for tributary in &readers { @@ -337,8 +326,27 @@ pub async fn heartbeat_tributaries( pub async fn handle_p2p( our_key: ::G, p2p: P, - tributaries: Arc>>, + mut new_tributary: broadcast::Receiver>, ) { + // TODO: Merge this into the below loop. We don't need an extra task here + let tributaries = Arc::new(RwLock::new(HashMap::new())); + tokio::spawn({ + let tributaries = tributaries.clone(); + async move { + loop { + match new_tributary.recv().await { + Ok(tributary) => { + tributaries.write().await.insert(tributary.spec.genesis(), tributary); + } + Err(broadcast::error::RecvError::Lagged(_)) => { + panic!("handle_p2p lagged to handle new_tributary") + } + Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"), + } + } + } + }); + loop { let mut msg = p2p.receive().await; // Spawn a dedicated task to handle this message, ensuring any singularly latent message @@ -359,7 +367,7 @@ pub async fn handle_p2p( }; log::trace!("handling message for tributary {:?}", tributary.spec.set()); - if tributary.tributary.read().await.handle_message(&msg.msg).await { + if tributary.tributary.handle_message(&msg.msg).await { P2p::broadcast(&p2p, msg.kind, msg.msg).await; } } @@ -378,7 +386,7 @@ pub async fn handle_p2p( log::debug!("received heartbeat message for unknown network"); return; }; - let tributary_read = tributary.tributary.read().await; + let tributary_read = &tributary.tributary; /* // Have sqrt(n) nodes reply with the blocks @@ -417,7 +425,6 @@ pub async fn handle_p2p( log::debug!("received heartbeat and selected to respond"); let reader = tributary_read.reader(); - drop(tributary_read); let mut latest = msg.msg[.. 32].try_into().unwrap(); while let Some(next) = reader.block_after(&latest) { @@ -446,7 +453,7 @@ pub async fn handle_p2p( return; }; - let res = tributary.tributary.read().await.sync_block(block, msg.msg).await; + let res = tributary.tributary.sync_block(block, msg.msg).await; log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); } } @@ -483,10 +490,29 @@ pub async fn handle_processors( key: Zeroizing<::F>, serai: Arc, mut processors: Pro, - tributaries: Arc>>, + mut new_tributary: broadcast::Receiver>, ) { let pub_key = Ristretto::generator() * key.deref(); + // TODO: Merge this into the below loop. We don't need an extra task here + let tributaries = Arc::new(RwLock::new(HashMap::new())); + tokio::spawn({ + let tributaries = tributaries.clone(); + async move { + loop { + match new_tributary.recv().await { + Ok(tributary) => { + tributaries.write().await.insert(tributary.spec.genesis(), tributary); + } + Err(broadcast::error::RecvError::Lagged(_)) => { + panic!("handle_processors lagged to handle new_tributary") + } + Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"), + } + } + } + }); + loop { // TODO: Dispatch this message to a task dedicated to handling this processor, preventing one // processor from holding up all the others. This would require a peek method be added to the @@ -738,15 +764,13 @@ pub async fn handle_processors( if let Some(mut tx) = tx { log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); let tributaries = tributaries.read().await; - log::trace!("read global tributaries"); let Some(tributary) = tributaries.get(&genesis) else { // TODO: This can happen since Substrate tells the Processor to generate commitments // at the same time it tells the Tributary to be created // There's no guarantee the Tributary will have been created though panic!("processor is operating on tributary we don't have"); }; - let tributary = tributary.tributary.read().await; - log::trace!("read specific tributary"); + let tributary = &tributary.tributary; match tx.kind() { TransactionKind::Provided(_) => { @@ -782,7 +806,7 @@ pub async fn handle_processors( }; tx.sign(&mut OsRng, genesis, &key, nonce); - publish_signed_transaction(&tributary, tx).await; + publish_signed_transaction(tributary, tx).await; } } } @@ -800,7 +824,11 @@ pub async fn run( ) { let serai = Arc::new(serai); - let (new_tributary_channel_send, new_tributary_channel_recv) = mpsc::unbounded_channel(); + let (new_tributary_spec_send, mut new_tributary_spec_recv) = mpsc::unbounded_channel(); + // Reload active tributaries from the database + for spec in MainDb::new(&mut raw_db).active_tributaries().1 { + new_tributary_spec_send.send(spec).unwrap(); + } // Handle new Substrate blocks tokio::spawn(scan_substrate( @@ -808,33 +836,64 @@ pub async fn run( key.clone(), processors.clone(), serai.clone(), - new_tributary_channel_send, + new_tributary_spec_send, )); // Handle the Tributaries - // Arc so this can be shared between the Tributary scanner task and the P2P task - // Write locks on this may take a while to acquire - let tributaries = Arc::new(RwLock::new(HashMap::<[u8; 32], ActiveTributary>::new())); + // This should be large enough for an entire rotation of all tributaries + // If it's too small, the coordinator fail to boot, which is a decent sanity check + let (new_tributary, mut new_tributary_listener_1) = broadcast::channel(32); + let new_tributary_listener_2 = new_tributary.subscribe(); + let new_tributary_listener_3 = new_tributary.subscribe(); + let new_tributary_listener_4 = new_tributary.subscribe(); + let new_tributary_listener_5 = new_tributary.subscribe(); - // Reload active tributaries from the database - for spec in MainDb::new(&mut raw_db).active_tributaries().1 { - let _ = add_tributary( - raw_db.clone(), - key.clone(), - &processors, - p2p.clone(), - &mut *tributaries.write().await, - spec, - ) - .await; - } + // Spawn a task to further add Tributaries as needed + tokio::spawn({ + let raw_db = raw_db.clone(); + let key = key.clone(); + let processors = processors.clone(); + let p2p = p2p.clone(); + async move { + loop { + let spec = new_tributary_spec_recv.recv().await.unwrap(); + add_tributary( + raw_db.clone(), + key.clone(), + &processors, + p2p.clone(), + &new_tributary, + spec.clone(), + ) + .await; + } + } + }); // When we reach synchrony on an event requiring signing, send our preprocess for it let recognized_id = { let raw_db = raw_db.clone(); let key = key.clone(); - let tributaries = tributaries.clone(); + + let tributaries = Arc::new(RwLock::new(HashMap::new())); + tokio::spawn({ + let tributaries = tributaries.clone(); + async move { + loop { + match new_tributary_listener_1.recv().await { + Ok(tributary) => { + tributaries.write().await.insert(tributary.spec.genesis(), tributary); + } + Err(broadcast::error::RecvError::Lagged(_)) => { + panic!("recognized_id lagged to handle new_tributary") + } + Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"), + } + } + } + }); + move |network, genesis, id_type, id, nonce| { let raw_db = raw_db.clone(); let key = key.clone(); @@ -876,8 +935,7 @@ pub async fn run( let Some(tributary) = tributaries.get(&genesis) else { panic!("tributary we don't have came to consensus on an Batch"); }; - let tributary = tributary.tributary.read().await; - publish_signed_transaction(&tributary, tx).await; + publish_signed_transaction(&tributary.tributary, tx).await; } } }; @@ -892,20 +950,19 @@ pub async fn run( p2p.clone(), processors.clone(), serai.clone(), - tributaries.clone(), - new_tributary_channel_recv, + new_tributary_listener_2, )); } // Spawn the heartbeat task, which will trigger syncing if there hasn't been a Tributary block // in a while (presumably because we're behind) - tokio::spawn(heartbeat_tributaries(p2p.clone(), tributaries.clone())); + tokio::spawn(heartbeat_tributaries(p2p.clone(), new_tributary_listener_3)); // Handle P2P messages - tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, tributaries.clone())); + tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, new_tributary_listener_4)); // Handle all messages from processors - handle_processors(raw_db, key, serai, processors, tributaries).await; + handle_processors(raw_db, key, serai, processors, new_tributary_listener_5).await; } #[tokio::main] diff --git a/coordinator/src/tests/tributary/handle_p2p.rs b/coordinator/src/tests/tributary/handle_p2p.rs index becf5059f..87576dd8f 100644 --- a/coordinator/src/tests/tributary/handle_p2p.rs +++ b/coordinator/src/tests/tributary/handle_p2p.rs @@ -1,11 +1,11 @@ use core::time::Duration; -use std::{sync::Arc, collections::HashMap}; +use std::sync::Arc; use rand_core::OsRng; use ciphersuite::{Ciphersuite, Ristretto}; -use tokio::{sync::RwLock, time::sleep}; +use tokio::{sync::broadcast, time::sleep}; use serai_db::MemDb; @@ -27,18 +27,18 @@ async fn handle_p2p_test() { let mut tributaries = new_tributaries(&keys, &spec).await; + let mut tributary_senders = vec![]; let mut tributary_arcs = vec![]; for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() { - let tributary = Arc::new(RwLock::new(tributary)); + let tributary = Arc::new(tributary); tributary_arcs.push(tributary.clone()); - tokio::spawn(handle_p2p( - Ristretto::generator() * *keys[i], - p2p, - Arc::new(RwLock::new(HashMap::from([( - spec.genesis(), - ActiveTributary { spec: spec.clone(), tributary }, - )]))), - )); + let (new_tributary_send, new_tributary_recv) = broadcast::channel(5); + tokio::spawn(handle_p2p(Ristretto::generator() * *keys[i], p2p, new_tributary_recv)); + new_tributary_send + .send(ActiveTributary { spec: spec.clone(), tributary }) + .map_err(|_| "failed to send ActiveTributary") + .unwrap(); + tributary_senders.push(new_tributary_send); } let tributaries = tributary_arcs; @@ -46,22 +46,22 @@ async fn handle_p2p_test() { // We don't wait one block of time as we may have missed the chance for this block sleep(Duration::from_secs((2 * Tributary::::block_time()).into())) .await; - let tip = tributaries[0].read().await.tip().await; + let tip = tributaries[0].tip().await; assert!(tip != spec.genesis()); // Sleep one second to make sure this block propagates sleep(Duration::from_secs(1)).await; // Make sure every tributary has it for tributary in &tributaries { - assert!(tributary.read().await.reader().block(&tip).is_some()); + assert!(tributary.reader().block(&tip).is_some()); } // Then after another block of time, we should have yet another new block sleep(Duration::from_secs(Tributary::::block_time().into())).await; - let new_tip = tributaries[0].read().await.tip().await; + let new_tip = tributaries[0].tip().await; assert!(new_tip != tip); sleep(Duration::from_secs(1)).await; for tributary in tributaries { - assert!(tributary.read().await.reader().block(&new_tip).is_some()); + assert!(tributary.reader().block(&new_tip).is_some()); } } diff --git a/coordinator/src/tests/tributary/sync.rs b/coordinator/src/tests/tributary/sync.rs index ced97bd6b..af4bb0c79 100644 --- a/coordinator/src/tests/tributary/sync.rs +++ b/coordinator/src/tests/tributary/sync.rs @@ -1,14 +1,11 @@ use core::time::Duration; -use std::{ - sync::Arc, - collections::{HashSet, HashMap}, -}; +use std::{sync::Arc, collections::HashSet}; use rand_core::OsRng; use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; -use tokio::{sync::RwLock, time::sleep}; +use tokio::{sync::broadcast, time::sleep}; use serai_db::MemDb; @@ -37,19 +34,20 @@ async fn sync_test() { let (syncer_p2p, syncer_tributary) = tributaries.pop().unwrap(); // Have the rest form a P2P net + let mut tributary_senders = vec![]; let mut tributary_arcs = vec![]; let mut p2p_threads = vec![]; for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() { - let tributary = Arc::new(RwLock::new(tributary)); + let tributary = Arc::new(tributary); tributary_arcs.push(tributary.clone()); - let thread = tokio::spawn(handle_p2p( - Ristretto::generator() * *keys[i], - p2p, - Arc::new(RwLock::new(HashMap::from([( - spec.genesis(), - ActiveTributary { spec: spec.clone(), tributary }, - )]))), - )); + let (new_tributary_send, new_tributary_recv) = broadcast::channel(5); + let thread = + tokio::spawn(handle_p2p(Ristretto::generator() * *keys[i], p2p, new_tributary_recv)); + new_tributary_send + .send(ActiveTributary { spec: spec.clone(), tributary }) + .map_err(|_| "failed to send ActiveTributary") + .unwrap(); + tributary_senders.push(new_tributary_send); p2p_threads.push(thread); } let tributaries = tributary_arcs; @@ -60,14 +58,14 @@ async fn sync_test() { // propose by our 'offline' validator let block_time = u64::from(Tributary::::block_time()); sleep(Duration::from_secs(3 * block_time)).await; - let tip = tributaries[0].read().await.tip().await; + let tip = tributaries[0].tip().await; assert!(tip != spec.genesis()); // Sleep one second to make sure this block propagates sleep(Duration::from_secs(1)).await; // Make sure every tributary has it for tributary in &tributaries { - assert!(tributary.read().await.reader().block(&tip).is_some()); + assert!(tributary.reader().block(&tip).is_some()); } // Now that we've confirmed the other tributaries formed a net without issue, drop the syncer's @@ -76,31 +74,36 @@ async fn sync_test() { // Have it join the net let syncer_key = Ristretto::generator() * *syncer_key; - let syncer_tributary = Arc::new(RwLock::new(syncer_tributary)); - let syncer_tributaries = Arc::new(RwLock::new(HashMap::from([( - spec.genesis(), - ActiveTributary { spec: spec.clone(), tributary: syncer_tributary.clone() }, - )]))); - tokio::spawn(handle_p2p(syncer_key, syncer_p2p.clone(), syncer_tributaries.clone())); + let syncer_tributary = Arc::new(syncer_tributary); + let (syncer_tributary_send, syncer_tributary_recv) = broadcast::channel(5); + tokio::spawn(handle_p2p(syncer_key, syncer_p2p.clone(), syncer_tributary_recv)); + syncer_tributary_send + .send(ActiveTributary { spec: spec.clone(), tributary: syncer_tributary.clone() }) + .map_err(|_| "failed to send ActiveTributary to syncer") + .unwrap(); // It shouldn't automatically catch up. If it somehow was, our test would be broken // Sanity check this - let tip = tributaries[0].read().await.tip().await; + let tip = tributaries[0].tip().await; sleep(Duration::from_secs(2 * block_time)).await; - assert!(tributaries[0].read().await.tip().await != tip); - assert_eq!(syncer_tributary.read().await.tip().await, spec.genesis()); + assert!(tributaries[0].tip().await != tip); + assert_eq!(syncer_tributary.tip().await, spec.genesis()); // Start the heartbeat protocol - tokio::spawn(heartbeat_tributaries(syncer_p2p, syncer_tributaries)); + let (syncer_heartbeat_tributary_send, syncer_heartbeat_tributary_recv) = broadcast::channel(5); + tokio::spawn(heartbeat_tributaries(syncer_p2p, syncer_heartbeat_tributary_recv)); + syncer_heartbeat_tributary_send + .send(ActiveTributary { spec: spec.clone(), tributary: syncer_tributary.clone() }) + .map_err(|_| "failed to send ActiveTributary to heartbeat") + .unwrap(); // The heartbeat is once every 10 blocks sleep(Duration::from_secs(10 * block_time)).await; - assert!(syncer_tributary.read().await.tip().await != spec.genesis()); + assert!(syncer_tributary.tip().await != spec.genesis()); // Verify it synced to the tip let syncer_tip = { - let tributary = tributaries[0].write().await; - let syncer_tributary = syncer_tributary.write().await; + let tributary = &tributaries[0]; let tip = tributary.tip().await; let syncer_tip = syncer_tributary.tip().await; @@ -114,7 +117,7 @@ async fn sync_test() { sleep(Duration::from_secs(block_time)).await; // Verify it's now keeping up - assert!(syncer_tributary.read().await.tip().await != syncer_tip); + assert!(syncer_tributary.tip().await != syncer_tip); // Verify it's now participating in consensus // Because only `t` validators are used in a commit, take n - t nodes offline @@ -128,7 +131,6 @@ async fn sync_test() { // wait for a block sleep(Duration::from_secs(block_time)).await; - let syncer_tributary = syncer_tributary.read().await; if syncer_tributary .reader() .parsed_commit(&syncer_tributary.tip().await)