Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tendermint chain sync #581

Merged
merged 3 commits into from
Jul 16, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 58 additions & 23 deletions coordinator/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use async_trait::async_trait;
use rand_core::{RngCore, OsRng};

use scale::Encode;
use scale::{Decode, Encode};
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai};

Expand All @@ -29,7 +29,7 @@ use libp2p::{
noise, yamux,
request_response::{
Codec as RrCodecTrait, Message as RrMessage, Event as RrEvent, Config as RrConfig,
Behaviour as RrBehavior,
Behaviour as RrBehavior, ProtocolSupport,
},
gossipsub::{
IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder,
Expand All @@ -48,6 +48,9 @@ use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};
const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;
akildemir marked this conversation as resolved.
Show resolved Hide resolved
const LIBP2P_TOPIC: &str = "serai-coordinator";

// amount of blocks in a minute
const BLOCKS_PER_MINUTE: u32 = 60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000);

#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)]
pub struct CosignedBlock {
pub network: NetworkId,
Expand Down Expand Up @@ -173,6 +176,18 @@ pub struct Message<P: P2p> {
pub msg: Vec<u8>,
}

#[derive(Clone, Debug, Encode, Decode)]
pub struct BlockCommit {
pub block: Vec<u8>,
pub commit: Vec<u8>,
}

#[derive(Clone, Debug, Encode, Decode)]
pub struct HeartbeatBatch {
pub blocks: Vec<BlockCommit>,
pub timestamp: u64,
}

#[async_trait]
pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
type Id: Send + Sync + Clone + Copy + fmt::Debug;
Expand Down Expand Up @@ -297,7 +312,7 @@ impl LibP2p {
let throwaway_key_pair = Keypair::generate_ed25519();

let behavior = Behavior {
reqres: { RrBehavior::new([], RrConfig::default()) },
reqres: { RrBehavior::new([("/coordinator", ProtocolSupport::Full)], RrConfig::default()) },
gossipsub: {
let heartbeat_interval = tributary::tendermint::LATENCY_TIME / 2;
let heartbeats_per_block =
Expand Down Expand Up @@ -868,7 +883,7 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
let p2p = p2p.clone();
async move {
loop {
let Some(mut msg) = recv.recv().await else {
let Some(msg) = recv.recv().await else {
// Channel closure happens when the tributary retires
break;
};
Expand Down Expand Up @@ -913,34 +928,54 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
latest = next;
}
if to_send.len() > 3 {
for next in to_send {
let mut res = reader.block(&next).unwrap().serialize();
res.extend(reader.commit(&next).unwrap());
// Also include the timestamp used within the Heartbeat
res.extend(&msg.msg[32 .. 40]);
p2p.send(msg.sender, ReqResMessageKind::Block(genesis), res).await;
// prepare the batch to sends
let mut blocks = vec![];
for (i, next) in to_send.iter().enumerate() {
// each batch contains BLOCKS_PER_MINUTE + 1 blocks
if i >= usize::try_from(BLOCKS_PER_MINUTE).unwrap() + 1 {
break;
}

blocks.push(BlockCommit {
block: reader.block(&next).unwrap().serialize(),
commit: reader.commit(&next).unwrap(),
});
}
let batch = HeartbeatBatch { blocks, timestamp: msg_time };

p2p
.send(msg.sender, ReqResMessageKind::Block(genesis), batch.encode())
.await;
}
});
}

P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => {
assert_eq!(msg_genesis, genesis);
let mut msg_ref: &[u8] = msg.msg.as_ref();
let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else {
log::error!("received block message with an invalidly serialized block");
// decode the batch
let Ok(batch) = HeartbeatBatch::decode(&mut msg.msg.as_ref()) else {
log::error!(
"received HeartBeatBatch message with an invalidly serialized batch"
);
continue;
};
// Get just the commit
msg.msg.drain(.. (msg.msg.len() - msg_ref.len()));
msg.msg.drain((msg.msg.len() - 8) ..);

let res = tributary.tributary.sync_block(block, msg.msg).await;
log::debug!(
"received block from {:?}, sync_block returned {}",
msg.sender,
res
);

// sync blocks
for bc in batch.blocks {
// TODO: why do we use ReadWrite instead of Encode/Decode for blocks?
// Should we use the same for batches so we can read both at the same time?
let Ok(block) = Block::<Transaction>::read(&mut bc.block.as_slice()) else {
log::error!("received block message with an invalidly serialized block");
continue;
};

let res = tributary.tributary.sync_block(block, bc.commit).await;
log::debug!(
"received block from {:?}, sync_block returned {}",
msg.sender,
res
);
}
}

P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => {
Expand Down
Loading