Skip to content

Commit

Permalink
serai-dex#560 take two, now that serai-dex#560 has been reverted (ser…
Browse files Browse the repository at this point in the history
…ai-dex#561)

* Clear upons upon round, not block

* Cache the proposal for a round

* Rebase onto develop, which reverted this PR, and re-apply this PR

* Set participation upon participation instead of constantly recalculating

* Cache message instances

* Add missing txn commit

Identified by @akildemir.

* Correct clippy lint identified upon rebase

* Fix tendermint chain sync (serai-dex#581)

* fix p2p Reqres protocol

* stabilize tributary chain sync

* fix pr comments

---------

Co-authored-by: akildemir <[email protected]>
  • Loading branch information
kayabaNerve and akildemir authored Jul 16, 2024
1 parent c0200df commit e772b8a
Show file tree
Hide file tree
Showing 11 changed files with 663 additions and 600 deletions.
1 change: 1 addition & 0 deletions common/db/src/parity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub use ::parity_db::{Options, Db as ParityDb};

use crate::*;

#[must_use]
pub struct Transaction<'a>(&'a Arc<ParityDb>, Vec<(u8, Vec<u8>, Option<Vec<u8>>)>);

impl Get for Transaction<'_> {
Expand Down
1 change: 1 addition & 0 deletions common/db/src/rocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use rocksdb::{

use crate::*;

#[must_use]
pub struct Transaction<'a, T: ThreadMode>(
RocksTransaction<'a, OptimisticTransactionDB<T>>,
&'a OptimisticTransactionDB<T>,
Expand Down
103 changes: 73 additions & 30 deletions coordinator/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use async_trait::async_trait;
use rand_core::{RngCore, OsRng};

use scale::Encode;
use scale::{Decode, Encode};
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai};

Expand All @@ -29,7 +29,7 @@ use libp2p::{
noise, yamux,
request_response::{
Codec as RrCodecTrait, Message as RrMessage, Event as RrEvent, Config as RrConfig,
Behaviour as RrBehavior,
Behaviour as RrBehavior, ProtocolSupport,
},
gossipsub::{
IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder,
Expand All @@ -45,9 +45,20 @@ pub(crate) use tributary::{ReadWrite, P2p as TributaryP2p};
use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};

// Block size limit + 1 KB of space for signatures/metadata
const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;
const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;

const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
(tributary::BLOCK_SIZE_LIMIT * BLOCKS_PER_BATCH) + 1024;

const LIBP2P_TOPIC: &str = "serai-coordinator";

// Amount of blocks in a minute
// We can't use tendermint::TARGET_BLOCK_TIME here to calculate this since that is a u32.
const BLOCKS_PER_MINUTE: usize = 10;

// Maximum amount of blocks to send in a batch
const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1;

#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)]
pub struct CosignedBlock {
pub network: NetworkId,
Expand Down Expand Up @@ -173,6 +184,18 @@ pub struct Message<P: P2p> {
pub msg: Vec<u8>,
}

#[derive(Clone, Debug, Encode, Decode)]
pub struct BlockCommit {
pub block: Vec<u8>,
pub commit: Vec<u8>,
}

#[derive(Clone, Debug, Encode, Decode)]
pub struct HeartbeatBatch {
pub blocks: Vec<BlockCommit>,
pub timestamp: u64,
}

#[async_trait]
pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
type Id: Send + Sync + Clone + Copy + fmt::Debug;
Expand Down Expand Up @@ -228,8 +251,8 @@ impl RrCodecTrait for RrCodec {
let mut len = [0; 4];
io.read_exact(&mut len).await?;
let len = usize::try_from(u32::from_le_bytes(len)).expect("not a 32-bit platform?");
if len > MAX_LIBP2P_MESSAGE_SIZE {
Err(io::Error::other("request length exceeded MAX_LIBP2P_MESSAGE_SIZE"))?;
if len > MAX_LIBP2P_REQRES_MESSAGE_SIZE {
Err(io::Error::other("request length exceeded MAX_LIBP2P_REQRES_MESSAGE_SIZE"))?;
}
// This may be a non-trivial allocation easily causable
// While we could chunk the read, meaning we only perform the allocation as bandwidth is used,
Expand Down Expand Up @@ -297,7 +320,7 @@ impl LibP2p {
let throwaway_key_pair = Keypair::generate_ed25519();

let behavior = Behavior {
reqres: { RrBehavior::new([], RrConfig::default()) },
reqres: { RrBehavior::new([("/coordinator", ProtocolSupport::Full)], RrConfig::default()) },
gossipsub: {
let heartbeat_interval = tributary::tendermint::LATENCY_TIME / 2;
let heartbeats_per_block =
Expand All @@ -308,7 +331,7 @@ impl LibP2p {
.heartbeat_interval(Duration::from_millis(heartbeat_interval.into()))
.history_length(heartbeats_per_block * 2)
.history_gossip(heartbeats_per_block)
.max_transmit_size(MAX_LIBP2P_MESSAGE_SIZE)
.max_transmit_size(MAX_LIBP2P_GOSSIP_MESSAGE_SIZE)
// We send KeepAlive after 80s
.idle_timeout(Duration::from_secs(85))
.validation_mode(ValidationMode::Strict)
Expand Down Expand Up @@ -348,10 +371,11 @@ impl LibP2p {
.with_tcp(TcpConfig::default().nodelay(true), noise::Config::new, || {
let mut config = yamux::Config::default();
// 1 MiB default + max message size
config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE);
config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_REQRES_MESSAGE_SIZE);
// 256 KiB default + max message size
config
.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap());
config.set_receive_window_size(
((256 * 1024) + MAX_LIBP2P_REQRES_MESSAGE_SIZE).try_into().unwrap(),
);
config
})
.unwrap()
Expand Down Expand Up @@ -868,7 +892,7 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
let p2p = p2p.clone();
async move {
loop {
let Some(mut msg) = recv.recv().await else {
let Some(msg) = recv.recv().await else {
// Channel closure happens when the tributary retires
break;
};
Expand Down Expand Up @@ -913,34 +937,53 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
latest = next;
}
if to_send.len() > 3 {
for next in to_send {
let mut res = reader.block(&next).unwrap().serialize();
res.extend(reader.commit(&next).unwrap());
// Also include the timestamp used within the Heartbeat
res.extend(&msg.msg[32 .. 40]);
p2p.send(msg.sender, ReqResMessageKind::Block(genesis), res).await;
// prepare the batch to sends
let mut blocks = vec![];
for (i, next) in to_send.iter().enumerate() {
if i >= BLOCKS_PER_BATCH {
break;
}

blocks.push(BlockCommit {
block: reader.block(next).unwrap().serialize(),
commit: reader.commit(next).unwrap(),
});
}
let batch = HeartbeatBatch { blocks, timestamp: msg_time };

p2p
.send(msg.sender, ReqResMessageKind::Block(genesis), batch.encode())
.await;
}
});
}

P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => {
assert_eq!(msg_genesis, genesis);
let mut msg_ref: &[u8] = msg.msg.as_ref();
let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else {
log::error!("received block message with an invalidly serialized block");
// decode the batch
let Ok(batch) = HeartbeatBatch::decode(&mut msg.msg.as_ref()) else {
log::error!(
"received HeartBeatBatch message with an invalidly serialized batch"
);
continue;
};
// Get just the commit
msg.msg.drain(.. (msg.msg.len() - msg_ref.len()));
msg.msg.drain((msg.msg.len() - 8) ..);

let res = tributary.tributary.sync_block(block, msg.msg).await;
log::debug!(
"received block from {:?}, sync_block returned {}",
msg.sender,
res
);

// sync blocks
for bc in batch.blocks {
// TODO: why do we use ReadWrite instead of Encode/Decode for blocks?
// Should we use the same for batches so we can read both at the same time?
let Ok(block) = Block::<Transaction>::read(&mut bc.block.as_slice()) else {
log::error!("received block message with an invalidly serialized block");
continue;
};

let res = tributary.tributary.sync_block(block, bc.commit).await;
log::debug!(
"received block from {:?}, sync_block returned {}",
msg.sender,
res
);
}
}

P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => {
Expand Down
34 changes: 2 additions & 32 deletions coordinator/tributary/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::{marker::PhantomData, fmt::Debug};
use std::{sync::Arc, io, collections::VecDeque};
use std::{sync::Arc, io};

use async_trait::async_trait;

Expand Down Expand Up @@ -154,14 +154,6 @@ pub struct Tributary<D: Db, T: TransactionTrait, P: P2p> {
synced_block: Arc<RwLock<SyncedBlockSender<TendermintNetwork<D, T, P>>>>,
synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>,
messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>,

p2p_meta_task_handle: Arc<tokio::task::AbortHandle>,
}

impl<D: Db, T: TransactionTrait, P: P2p> Drop for Tributary<D, T, P> {
fn drop(&mut self) {
self.p2p_meta_task_handle.abort();
}
}

impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
Expand Down Expand Up @@ -193,28 +185,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
);
let blockchain = Arc::new(RwLock::new(blockchain));

let to_rebroadcast = Arc::new(RwLock::new(VecDeque::new()));
// Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the
// P2P layer
let p2p_meta_task_handle = Arc::new(
tokio::spawn({
let to_rebroadcast = to_rebroadcast.clone();
let p2p = p2p.clone();
async move {
loop {
let to_rebroadcast = to_rebroadcast.read().await.clone();
for msg in to_rebroadcast {
p2p.broadcast(genesis, msg).await;
}
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
}
}
})
.abort_handle(),
);

let network =
TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p };
let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p };

let TendermintHandle { synced_block, synced_block_result, messages, machine } =
TendermintMachine::new(
Expand All @@ -235,7 +206,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
synced_block: Arc::new(RwLock::new(synced_block)),
synced_block_result: Arc::new(RwLock::new(synced_block_result)),
messages: Arc::new(RwLock::new(messages)),
p2p_meta_task_handle,
})
}

Expand Down
32 changes: 2 additions & 30 deletions coordinator/tributary/src/tendermint/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use core::ops::Deref;
use std::{
sync::Arc,
collections::{VecDeque, HashMap},
};
use std::{sync::Arc, collections::HashMap};

use async_trait::async_trait;

Expand Down Expand Up @@ -270,8 +267,6 @@ pub struct TendermintNetwork<D: Db, T: TransactionTrait, P: P2p> {
pub(crate) validators: Arc<Validators>,
pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>,

pub(crate) to_rebroadcast: Arc<RwLock<VecDeque<Vec<u8>>>>,

pub(crate) p2p: P,
}

Expand Down Expand Up @@ -308,26 +303,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
to_broadcast.extend(msg.encode());

// Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second
// until the block it's trying to build is complete
// If the P2P layer drops a message before all nodes obtained access, or a node had an
// intermittent failure, this will ensure reconcilliation
// This is atrocious if there's no content-based deduplication protocol for messages actively
// being gossiped
// LibP2p, as used by Serai, is configured to content-based deduplicate
{
let mut to_rebroadcast_lock = self.to_rebroadcast.write().await;
to_rebroadcast_lock.push_back(to_broadcast.clone());
// We should have, ideally, 3 * validators messages within a round
// Therefore, this should keep the most recent 2-rounds
// TODO: This isn't perfect. Each participant should just rebroadcast their latest round of
// messages
while to_rebroadcast_lock.len() > (6 * self.validators.weights.len()) {
to_rebroadcast_lock.pop_front();
}
}

self.p2p.broadcast(self.genesis, to_broadcast).await
}

Expand Down Expand Up @@ -366,7 +341,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
}
}

async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> {
async fn validate(&self, block: &Self::Block) -> Result<(), TendermintBlockError> {
let block =
Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?;
self
Expand Down Expand Up @@ -428,9 +403,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
}
}

// Since we've added a valid block, clear to_rebroadcast
*self.to_rebroadcast.write().await = VecDeque::new();

Some(TendermintBlock(
self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(),
))
Expand Down
17 changes: 8 additions & 9 deletions coordinator/tributary/tendermint/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
collections::{HashSet, HashMap},
};

use parity_scale_codec::Encode;
use serai_db::{Get, DbTxn, Db};

use crate::{
Expand All @@ -20,7 +19,7 @@ pub(crate) struct BlockData<N: Network> {

pub(crate) number: BlockNumber,
pub(crate) validator_id: Option<N::ValidatorId>,
pub(crate) proposal: Option<N::Block>,
pub(crate) our_proposal: Option<N::Block>,

pub(crate) log: MessageLog<N>,
pub(crate) slashes: HashSet<N::ValidatorId>,
Expand All @@ -43,15 +42,15 @@ impl<N: Network> BlockData<N> {
weights: Arc<N::Weights>,
number: BlockNumber,
validator_id: Option<N::ValidatorId>,
proposal: Option<N::Block>,
our_proposal: Option<N::Block>,
) -> BlockData<N> {
BlockData {
db,
genesis,

number,
validator_id,
proposal,
our_proposal,

log: MessageLog::new(weights),
slashes: HashSet::new(),
Expand Down Expand Up @@ -108,17 +107,17 @@ impl<N: Network> BlockData<N> {
self.populate_end_time(round);
}

// 11-13
// L11-13
self.round = Some(RoundData::<N>::new(
round,
time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]),
));
self.end_time.insert(round, self.round().end_time());

// 14-21
// L14-21
if Some(proposer) == self.validator_id {
let (round, block) = self.valid.clone().unzip();
block.or_else(|| self.proposal.clone()).map(|block| Data::Proposal(round, block))
block.or_else(|| self.our_proposal.clone()).map(|block| Data::Proposal(round, block))
} else {
self.round_mut().set_timeout(Step::Propose);
None
Expand Down Expand Up @@ -198,8 +197,8 @@ impl<N: Network> BlockData<N> {
assert!(!new_round);
None?;
}
// Put this message to the DB
txn.put(&msg_key, res.encode());
// Put that we're sending this message to the DB
txn.put(&msg_key, []);

txn.commit();
}
Expand Down
Loading

0 comments on commit e772b8a

Please sign in to comment.