From 0440e60645cf30ea64679fd63128a265ecc594bd Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Mon, 25 Sep 2023 17:15:36 -0400 Subject: [PATCH] Move heartbeat_tributaries from Tributary to TributaryReader Reduces contention. --- coordinator/src/main.rs | 18 +++++++++--------- coordinator/tributary/src/blockchain.rs | 12 ++++++++---- coordinator/tributary/src/lib.rs | 8 +++++++- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index ddb0bbfe9..ccc58c24d 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -300,12 +300,16 @@ pub async fn heartbeat_tributaries( let ten_blocks_of_time = Duration::from_secs((10 * Tributary::::block_time()).into()); + let mut readers = vec![]; + for tributary in tributaries.read().await.values() { + readers.push(tributary.tributary.read().await.reader()); + } + loop { - for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() { - let tributary = tributary.read().await; - let tip = tributary.tip().await; - let block_time = SystemTime::UNIX_EPOCH + - Duration::from_secs(tributary.reader().time_of_block(&tip).unwrap_or(0)); + for tributary in &readers { + let tip = tributary.tip(); + let block_time = + SystemTime::UNIX_EPOCH + Duration::from_secs(tributary.time_of_block(&tip).unwrap_or(0)); // Only trigger syncing if the block is more than a minute behind if SystemTime::now() > (block_time + Duration::from_secs(60)) { @@ -777,10 +781,6 @@ pub async fn handle_processors( }; tx.sign(&mut OsRng, genesis, &key, nonce); - let Some(tributary) = tributaries.get(&genesis) else { - panic!("tributary we don't have came to consensus on an Batch"); - }; - let tributary = tributary.tributary.read().await; publish_transaction(&tributary, tx).await; } } diff --git a/coordinator/tributary/src/blockchain.rs b/coordinator/tributary/src/blockchain.rs index 78c2ca2bf..9febf4d0d 100644 --- a/coordinator/tributary/src/blockchain.rs +++ b/coordinator/tributary/src/blockchain.rs @@ -27,8 +27,8 @@ pub(crate) struct Blockchain { } impl Blockchain { - fn tip_key(&self) -> Vec { - D::key(b"tributary_blockchain", b"tip", self.genesis) + fn tip_key(genesis: [u8; 32]) -> Vec { + D::key(b"tributary_blockchain", b"tip", genesis) } fn block_number_key(&self) -> Vec { D::key(b"tributary_blockchain", b"block_number", self.genesis) @@ -80,7 +80,7 @@ impl Blockchain { if let Some((block_number, tip)) = { let db = res.db.as_ref().unwrap(); - db.get(res.block_number_key()).map(|number| (number, db.get(res.tip_key()).unwrap())) + db.get(res.block_number_key()).map(|number| (number, db.get(Self::tip_key(genesis)).unwrap())) } { res.block_number = u32::from_le_bytes(block_number.try_into().unwrap()); res.tip.copy_from_slice(&tip); @@ -132,6 +132,10 @@ impl Blockchain { db.get(Self::block_after_key(&genesis, block)).map(|bytes| bytes.try_into().unwrap()) } + pub(crate) fn tip_from_db(db: &D, genesis: [u8; 32]) -> [u8; 32] { + db.get(Self::tip_key(genesis)).map(|bytes| bytes.try_into().unwrap()).unwrap_or(genesis) + } + pub(crate) fn add_transaction( &mut self, internal: bool, @@ -226,7 +230,7 @@ impl Blockchain { let mut txn = db.txn(); self.tip = block.hash(); - txn.put(self.tip_key(), self.tip); + txn.put(Self::tip_key(self.genesis), self.tip); self.block_number += 1; txn.put(self.block_number_key(), self.block_number.to_le_bytes()); diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index ebb7b165c..64383a5a9 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -344,7 +344,8 @@ impl TributaryReader { pub fn genesis(&self) -> [u8; 32] { self.1 } - // Since these values are static, they can be safely read from the database without lock + + // Since these values are static once set, they can be safely read from the database without lock // acquisition pub fn block(&self, hash: &[u8; 32]) -> Option> { Blockchain::::block_from_db(&self.0, self.1, hash) @@ -363,4 +364,9 @@ impl TributaryReader { .commit(hash) .map(|commit| Commit::::decode(&mut commit.as_ref()).unwrap().end_time) } + + // This isn't static, yet can be read with only minor discrepancy risks + pub fn tip(&self) -> [u8; 32] { + Blockchain::::tip_from_db(&self.0, self.1) + } }