From d3d3bdc828f3c873ad11c94fb1d147e4085d2186 Mon Sep 17 00:00:00 2001 From: akildemir Date: Mon, 15 Jul 2024 14:49:27 +0300 Subject: [PATCH 1/3] fix p2p Reqres protocol --- coordinator/src/p2p.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index ef876f9a8..53a85c00c 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -29,7 +29,7 @@ use libp2p::{ noise, yamux, request_response::{ Codec as RrCodecTrait, Message as RrMessage, Event as RrEvent, Config as RrConfig, - Behaviour as RrBehavior, + Behaviour as RrBehavior, ProtocolSupport, }, gossipsub::{ IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, @@ -297,7 +297,7 @@ impl LibP2p { let throwaway_key_pair = Keypair::generate_ed25519(); let behavior = Behavior { - reqres: { RrBehavior::new([], RrConfig::default()) }, + reqres: { RrBehavior::new([("/coordinator", ProtocolSupport::Full)], RrConfig::default()) }, gossipsub: { let heartbeat_interval = tributary::tendermint::LATENCY_TIME / 2; let heartbeats_per_block = From 1404446cd706145aa613756d16b7b3f615c16fc3 Mon Sep 17 00:00:00 2001 From: akildemir Date: Mon, 15 Jul 2024 16:42:25 +0300 Subject: [PATCH 2/3] stabilize tributary chain sync --- coordinator/src/p2p.rs | 77 ++++++++++++++++++++++++++++++------------ 1 file changed, 56 insertions(+), 21 deletions(-) diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 53a85c00c..5d7746f88 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -9,7 +9,7 @@ use std::{ use async_trait::async_trait; use rand_core::{RngCore, OsRng}; -use scale::Encode; +use scale::{Decode, Encode}; use borsh::{BorshSerialize, BorshDeserialize}; use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai}; @@ -48,6 +48,9 @@ use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent}; const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024; const LIBP2P_TOPIC: &str = "serai-coordinator"; +// amount of blocks in a minute +const BLOCKS_PER_MINUTE: u32 = 60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000); + #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)] pub struct CosignedBlock { pub network: NetworkId, @@ -173,6 +176,18 @@ pub struct Message { pub msg: Vec, } +#[derive(Clone, Debug, Encode, Decode)] +pub struct BlockCommit { + pub block: Vec, + pub commit: Vec, +} + +#[derive(Clone, Debug, Encode, Decode)] +pub struct HeartbeatBatch { + pub blocks: Vec, + pub timestamp: u64, +} + #[async_trait] pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { type Id: Send + Sync + Clone + Copy + fmt::Debug; @@ -868,7 +883,7 @@ pub async fn handle_p2p_task( let p2p = p2p.clone(); async move { loop { - let Some(mut msg) = recv.recv().await else { + let Some(msg) = recv.recv().await else { // Channel closure happens when the tributary retires break; }; @@ -913,34 +928,54 @@ pub async fn handle_p2p_task( latest = next; } if to_send.len() > 3 { - for next in to_send { - let mut res = reader.block(&next).unwrap().serialize(); - res.extend(reader.commit(&next).unwrap()); - // Also include the timestamp used within the Heartbeat - res.extend(&msg.msg[32 .. 40]); - p2p.send(msg.sender, ReqResMessageKind::Block(genesis), res).await; + // prepare the batch to sends + let mut blocks = vec![]; + for (i, next) in to_send.iter().enumerate() { + // each batch contains BLOCKS_PER_MINUTE + 1 blocks + if i >= usize::try_from(BLOCKS_PER_MINUTE).unwrap() + 1 { + break; + } + + blocks.push(BlockCommit { + block: reader.block(&next).unwrap().serialize(), + commit: reader.commit(&next).unwrap(), + }); } + let batch = HeartbeatBatch { blocks, timestamp: msg_time }; + + p2p + .send(msg.sender, ReqResMessageKind::Block(genesis), batch.encode()) + .await; } }); } P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => { assert_eq!(msg_genesis, genesis); - let mut msg_ref: &[u8] = msg.msg.as_ref(); - let Ok(block) = Block::::read(&mut msg_ref) else { - log::error!("received block message with an invalidly serialized block"); + // decode the batch + let Ok(batch) = HeartbeatBatch::decode(&mut msg.msg.as_ref()) else { + log::error!( + "received HeartBeatBatch message with an invalidly serialized batch" + ); continue; }; - // Get just the commit - msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); - msg.msg.drain((msg.msg.len() - 8) ..); - - let res = tributary.tributary.sync_block(block, msg.msg).await; - log::debug!( - "received block from {:?}, sync_block returned {}", - msg.sender, - res - ); + + // sync blocks + for bc in batch.blocks { + // TODO: why do we use ReadWrite instead of Encode/Decode for blocks? + // Should we use the same for batches so we can read both at the same time? + let Ok(block) = Block::::read(&mut bc.block.as_slice()) else { + log::error!("received block message with an invalidly serialized block"); + continue; + }; + + let res = tributary.tributary.sync_block(block, bc.commit).await; + log::debug!( + "received block from {:?}, sync_block returned {}", + msg.sender, + res + ); + } } P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => { From df1baecfea92dceae7e01e0155de54521230d51a Mon Sep 17 00:00:00 2001 From: akildemir Date: Tue, 16 Jul 2024 13:02:55 +0300 Subject: [PATCH 3/3] fix pr comments --- coordinator/src/p2p.rs | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 5d7746f88..06b87eb90 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -45,11 +45,19 @@ pub(crate) use tributary::{ReadWrite, P2p as TributaryP2p}; use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent}; // Block size limit + 1 KB of space for signatures/metadata -const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024; +const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024; + +const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize = + (tributary::BLOCK_SIZE_LIMIT * BLOCKS_PER_BATCH) + 1024; + const LIBP2P_TOPIC: &str = "serai-coordinator"; -// amount of blocks in a minute -const BLOCKS_PER_MINUTE: u32 = 60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000); +// Amount of blocks in a minute +// We can't use tendermint::TARGET_BLOCK_TIME here to calculate this since that is a u32. +const BLOCKS_PER_MINUTE: usize = 10; + +// Maximum amount of blocks to send in a batch +const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)] pub struct CosignedBlock { @@ -243,8 +251,8 @@ impl RrCodecTrait for RrCodec { let mut len = [0; 4]; io.read_exact(&mut len).await?; let len = usize::try_from(u32::from_le_bytes(len)).expect("not a 32-bit platform?"); - if len > MAX_LIBP2P_MESSAGE_SIZE { - Err(io::Error::other("request length exceeded MAX_LIBP2P_MESSAGE_SIZE"))?; + if len > MAX_LIBP2P_REQRES_MESSAGE_SIZE { + Err(io::Error::other("request length exceeded MAX_LIBP2P_REQRES_MESSAGE_SIZE"))?; } // This may be a non-trivial allocation easily causable // While we could chunk the read, meaning we only perform the allocation as bandwidth is used, @@ -323,7 +331,7 @@ impl LibP2p { .heartbeat_interval(Duration::from_millis(heartbeat_interval.into())) .history_length(heartbeats_per_block * 2) .history_gossip(heartbeats_per_block) - .max_transmit_size(MAX_LIBP2P_MESSAGE_SIZE) + .max_transmit_size(MAX_LIBP2P_GOSSIP_MESSAGE_SIZE) // We send KeepAlive after 80s .idle_timeout(Duration::from_secs(85)) .validation_mode(ValidationMode::Strict) @@ -363,10 +371,11 @@ impl LibP2p { .with_tcp(TcpConfig::default().nodelay(true), noise::Config::new, || { let mut config = yamux::Config::default(); // 1 MiB default + max message size - config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE); + config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_REQRES_MESSAGE_SIZE); // 256 KiB default + max message size - config - .set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap()); + config.set_receive_window_size( + ((256 * 1024) + MAX_LIBP2P_REQRES_MESSAGE_SIZE).try_into().unwrap(), + ); config }) .unwrap() @@ -931,14 +940,13 @@ pub async fn handle_p2p_task( // prepare the batch to sends let mut blocks = vec![]; for (i, next) in to_send.iter().enumerate() { - // each batch contains BLOCKS_PER_MINUTE + 1 blocks - if i >= usize::try_from(BLOCKS_PER_MINUTE).unwrap() + 1 { + if i >= BLOCKS_PER_BATCH { break; } blocks.push(BlockCommit { - block: reader.block(&next).unwrap().serialize(), - commit: reader.commit(&next).unwrap(), + block: reader.block(next).unwrap().serialize(), + commit: reader.commit(next).unwrap(), }); } let batch = HeartbeatBatch { blocks, timestamp: msg_time };