From 7a59c4d4d2f3d3dcf55880e9a3fd12a5a73dc25e Mon Sep 17 00:00:00 2001 From: Martin Stefcek <35243812+Cifko@users.noreply.github.com> Date: Mon, 11 Dec 2023 10:28:15 +0100 Subject: [PATCH] feat: request foreign blocks (#760) Description --- When VN receives foreign proposal that was not expected, it requests all the missing proposals. They are send in order. So the node can process them as they come. Motivation and Context --- How Has This Been Tested? --- Not tested. What process can a PR reviewer use to test or verify this change? --- You can change the code to VN to not receive foreign proposal and then request it again later when another foreign proposal comes in. Breaking Changes --- - [x] None - [ ] Requires data directory to be deleted - [ ] Other - Please specify --- dan_layer/consensus/src/hotstuff/error.rs | 2 + dan_layer/consensus/src/hotstuff/mod.rs | 1 + .../hotstuff/on_receive_foreign_proposal.rs | 29 ++++++-- ..._receive_request_missing_foreign_blocks.rs | 66 +++++++++++++++++++ dan_layer/consensus/src/hotstuff/worker.rs | 17 ++++- dan_layer/consensus/src/messages/message.rs | 14 +++- dan_layer/consensus/src/messages/mod.rs | 3 + .../request_missing_foreign_blocks.rs | 12 ++++ .../up.sql | 10 +++ dan_layer/state_store_sqlite/src/reader.rs | 40 ++++++++++- dan_layer/state_store_sqlite/src/schema.rs | 11 ++++ dan_layer/state_store_sqlite/src/writer.rs | 17 ++++- dan_layer/storage/src/state_store/mod.rs | 8 ++- .../validator_node_rpc/proto/consensus.proto | 7 ++ .../src/conversions/consensus.rs | 30 +++++++++ 15 files changed, 257 insertions(+), 10 deletions(-) create mode 100644 dan_layer/consensus/src/hotstuff/on_receive_request_missing_foreign_blocks.rs create mode 100644 dan_layer/consensus/src/messages/request_missing_foreign_blocks.rs diff --git a/dan_layer/consensus/src/hotstuff/error.rs b/dan_layer/consensus/src/hotstuff/error.rs index 972d3ca0c..1dc15c06d 100644 --- a/dan_layer/consensus/src/hotstuff/error.rs +++ b/dan_layer/consensus/src/hotstuff/error.rs @@ -146,4 +146,6 @@ pub enum ProposalValidationError { }, #[error("Proposed block {block_id} {height} already has been processed")] BlockAlreadyProcessed { block_id: BlockId, height: NodeHeight }, + #[error("Internal channel send error when {context}")] + InternalChannelClosed { context: &'static str }, } diff --git a/dan_layer/consensus/src/hotstuff/mod.rs b/dan_layer/consensus/src/hotstuff/mod.rs index f72fd3db6..4ca3a2501 100644 --- a/dan_layer/consensus/src/hotstuff/mod.rs +++ b/dan_layer/consensus/src/hotstuff/mod.rs @@ -14,6 +14,7 @@ mod on_ready_to_vote_on_local_block; mod on_receive_foreign_proposal; mod on_receive_local_proposal; mod on_receive_new_view; +mod on_receive_request_missing_foreign_blocks; mod on_receive_request_missing_transactions; mod on_receive_requested_transactions; mod on_receive_vote; diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index 0b5280c7d..2f4cf04a6 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -16,10 +16,11 @@ use tari_dan_storage::{ StateStore, }; use tari_epoch_manager::EpochManagerReader; +use tokio::sync::mpsc; use crate::{ hotstuff::{error::HotStuffError, pacemaker_handle::PaceMakerHandle, ProposalValidationError}, - messages::ProposalMessage, + messages::{HotstuffMessage, ProposalMessage, RequestMissingForeignBlocksMessage}, traits::ConsensusSpec, }; @@ -31,6 +32,7 @@ pub struct OnReceiveForeignProposalHandler { transaction_pool: TransactionPool, pacemaker: PaceMakerHandle, foreign_receive_counter: ForeignReceiveCounters, + tx_leader: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>, } impl OnReceiveForeignProposalHandler @@ -42,6 +44,7 @@ where TConsensusSpec: ConsensusSpec transaction_pool: TransactionPool, pacemaker: PaceMakerHandle, foreign_receive_counter: ForeignReceiveCounters, + tx_leader: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>, ) -> Self { Self { store, @@ -49,6 +52,7 @@ where TConsensusSpec: ConsensusSpec transaction_pool, pacemaker, foreign_receive_counter, + tx_leader, } } @@ -74,7 +78,8 @@ where TConsensusSpec: ConsensusSpec .get_committee_shard(block.epoch(), vn.shard_key) .await?; let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?; - self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket())?; + self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket()) + .await?; // Is this ok? Can foreign node send invalid block that should still increment the counter? self.foreign_receive_counter.increment(&committee_shard.bucket()); self.store.with_write_tx(|tx| { @@ -153,7 +158,7 @@ where TConsensusSpec: ConsensusSpec Ok(()) } - fn validate_proposed_block( + async fn validate_proposed_block( &self, from: &TConsensusSpec::Addr, candidate_block: &Block, @@ -172,8 +177,22 @@ where TConsensusSpec: ConsensusSpec }; let current_index = self.foreign_receive_counter.get_index(&foreign_bucket); if current_index + 1 != incoming_index { - debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was - {incoming_index}", expected_index = current_index + 1); + debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was {incoming_index}", expected_index = current_index + 1); + if current_index < incoming_index { + self.tx_leader + .send(( + from.clone(), + HotstuffMessage::RequestMissingForeignBlocks(RequestMissingForeignBlocksMessage { + epoch: candidate_block.epoch(), + from: current_index + 1, + to: incoming_index, + }), + )) + .await + .map_err(|_| ProposalValidationError::InternalChannelClosed { + context: "tx_leader in OnNextSyncViewHandler::send_to_leader", + })?; + } return Err(ProposalValidationError::InvalidForeignCounters { proposed_by: from.to_string(), hash: *candidate_block.id(), diff --git a/dan_layer/consensus/src/hotstuff/on_receive_request_missing_foreign_blocks.rs b/dan_layer/consensus/src/hotstuff/on_receive_request_missing_foreign_blocks.rs new file mode 100644 index 000000000..196c4edc6 --- /dev/null +++ b/dan_layer/consensus/src/hotstuff/on_receive_request_missing_foreign_blocks.rs @@ -0,0 +1,66 @@ +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use log::*; +use tari_dan_storage::{StateStore, StateStoreReadTransaction}; +use tari_epoch_manager::EpochManagerReader; +use tokio::sync::mpsc; + +use crate::{ + hotstuff::error::HotStuffError, + messages::{HotstuffMessage, ProposalMessage, RequestMissingForeignBlocksMessage}, + traits::ConsensusSpec, +}; + +const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::on_receive_request_missing_transactions"; + +pub struct OnReceiveRequestMissingForeignBlocksHandler { + store: TConsensusSpec::StateStore, + epoch_manager: TConsensusSpec::EpochManager, + tx_request_missing_foreign_blocks: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>, +} + +impl OnReceiveRequestMissingForeignBlocksHandler +where TConsensusSpec: ConsensusSpec +{ + pub fn new( + store: TConsensusSpec::StateStore, + epoch_manager: TConsensusSpec::EpochManager, + tx_request_missing_foreign_blocks: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>, + ) -> Self { + Self { + store, + epoch_manager, + tx_request_missing_foreign_blocks, + } + } + + pub async fn handle( + &mut self, + from: TConsensusSpec::Addr, + msg: RequestMissingForeignBlocksMessage, + ) -> Result<(), HotStuffError> { + debug!(target: LOG_TARGET, "{} is requesting {}..{} missing blocks from epoch {}", from, msg.from, msg.to, msg.epoch); + let foreign_shard = self + .epoch_manager + .get_committee_shard_by_validator_address(msg.epoch, &from) + .await?; + let missing_blocks = self + .store + .with_read_tx(|tx| tx.blocks_get_foreign_ids(foreign_shard.bucket(), msg.from, msg.to))?; + for block in missing_blocks { + // We send the proposal back to the requester via hotstuff, so they follow the normal path including + // validation. + self.tx_request_missing_foreign_blocks + .send(( + from.clone(), + HotstuffMessage::ForeignProposal(ProposalMessage { block: block.clone() }), + )) + .await + .map_err(|_| HotStuffError::InternalChannelClosed { + context: "tx_leader in OnReceiveRequestMissingForeignBlocksHandler::handle", + })?; + } + Ok(()) + } +} diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index 635afff0f..74f1ed86c 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -19,7 +19,11 @@ use tari_shutdown::ShutdownSignal; use tari_transaction::{Transaction, TransactionId}; use tokio::sync::{broadcast, mpsc}; -use super::{on_receive_requested_transactions::OnReceiveRequestedTransactions, proposer::Proposer}; +use super::{ + on_receive_request_missing_foreign_blocks::OnReceiveRequestMissingForeignBlocksHandler, + on_receive_requested_transactions::OnReceiveRequestedTransactions, + proposer::Proposer, +}; use crate::{ hotstuff::{ common::CommitteeAndMessage, @@ -56,6 +60,7 @@ pub struct HotstuffWorker { on_receive_foreign_proposal: OnReceiveForeignProposalHandler, on_receive_vote: OnReceiveVoteHandler, on_receive_new_view: OnReceiveNewViewHandler, + on_receive_request_missing_foreign_blocks: OnReceiveRequestMissingForeignBlocksHandler, on_receive_request_missing_txs: OnReceiveRequestMissingTransactions, on_receive_requested_txs: OnReceiveRequestedTransactions, on_propose: OnPropose, @@ -139,6 +144,7 @@ impl HotstuffWorker { transaction_pool.clone(), pacemaker.clone_handle(), foreign_receive_counter, + tx_leader.clone(), ), on_receive_vote: OnReceiveVoteHandler::new(vote_receiver.clone()), on_receive_new_view: OnReceiveNewViewHandler::new( @@ -148,6 +154,11 @@ impl HotstuffWorker { pacemaker.clone_handle(), vote_receiver, ), + on_receive_request_missing_foreign_blocks: OnReceiveRequestMissingForeignBlocksHandler::new( + state_store.clone(), + epoch_manager.clone(), + tx_leader.clone(), + ), on_receive_request_missing_txs: OnReceiveRequestMissingTransactions::new( state_store.clone(), tx_leader.clone(), @@ -474,6 +485,10 @@ where TConsensusSpec: ConsensusSpec ); Ok(()) }, + HotstuffMessage::RequestMissingForeignBlocks(msg) => log_err( + "on_receive_request_missing_foreign_blocks", + self.on_receive_request_missing_foreign_blocks.handle(from, msg).await, + ), } } diff --git a/dan_layer/consensus/src/messages/message.rs b/dan_layer/consensus/src/messages/message.rs index 19651ea9d..b2e2b8d03 100644 --- a/dan_layer/consensus/src/messages/message.rs +++ b/dan_layer/consensus/src/messages/message.rs @@ -6,7 +6,13 @@ use std::fmt::Display; use serde::Serialize; use tari_dan_common_types::Epoch; -use super::{NewViewMessage, ProposalMessage, RequestedTransactionMessage, VoteMessage}; +use super::{ + NewViewMessage, + ProposalMessage, + RequestMissingForeignBlocksMessage, + RequestedTransactionMessage, + VoteMessage, +}; use crate::messages::{RequestMissingTransactionsMessage, SyncRequestMessage, SyncResponseMessage}; // Serialize is implemented for the message logger @@ -20,6 +26,7 @@ pub enum HotstuffMessage { RequestedTransaction(RequestedTransactionMessage), SyncRequest(SyncRequestMessage), SyncResponse(SyncResponseMessage), + RequestMissingForeignBlocks(RequestMissingForeignBlocksMessage), } impl HotstuffMessage { @@ -33,6 +40,7 @@ impl HotstuffMessage { HotstuffMessage::RequestedTransaction(_) => "RequestedTransaction", HotstuffMessage::SyncRequest(_) => "SyncRequest", HotstuffMessage::SyncResponse(_) => "SyncResponse", + HotstuffMessage::RequestMissingForeignBlocks(_) => "RequestMissingForeignBlocks", } } @@ -46,6 +54,7 @@ impl HotstuffMessage { Self::RequestedTransaction(msg) => msg.epoch, Self::SyncRequest(msg) => msg.epoch, Self::SyncResponse(msg) => msg.epoch, + Self::RequestMissingForeignBlocks(msg) => msg.epoch, } } @@ -70,6 +79,9 @@ impl Display for HotstuffMessage { HotstuffMessage::RequestedTransaction(msg) => write!(f, "RequestedTransaction({})", msg.transactions.len()), HotstuffMessage::SyncRequest(msg) => write!(f, "SyncRequest({})", msg.high_qc), HotstuffMessage::SyncResponse(msg) => write!(f, "SyncResponse({} block(s))", msg.blocks.len()), + HotstuffMessage::RequestMissingForeignBlocks(msg) => { + write!(f, "RequestMissingForeignBlocks({}..{})", msg.from, msg.to) + }, } } } diff --git a/dan_layer/consensus/src/messages/mod.rs b/dan_layer/consensus/src/messages/mod.rs index eab49ccd8..1fcc621a1 100644 --- a/dan_layer/consensus/src/messages/mod.rs +++ b/dan_layer/consensus/src/messages/mod.rs @@ -12,6 +12,9 @@ pub use proposal::*; mod vote; pub use vote::*; +mod request_missing_foreign_blocks; +pub use request_missing_foreign_blocks::*; + mod request_missing_transaction; pub use request_missing_transaction::*; diff --git a/dan_layer/consensus/src/messages/request_missing_foreign_blocks.rs b/dan_layer/consensus/src/messages/request_missing_foreign_blocks.rs new file mode 100644 index 000000000..951abd30f --- /dev/null +++ b/dan_layer/consensus/src/messages/request_missing_foreign_blocks.rs @@ -0,0 +1,12 @@ +// Copyright 2023 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use serde::Serialize; +use tari_dan_common_types::Epoch; + +#[derive(Debug, Clone, Serialize)] +pub struct RequestMissingForeignBlocksMessage { + pub epoch: Epoch, + pub from: u64, + pub to: u64, +} diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index ebcd158b5..336018610 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -271,6 +271,16 @@ CREATE TABLE foreign_receive_counters created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP ); +CREATE TABLE blocks_foreign_id_mapping +( + id integer not NULL primary key AUTOINCREMENT, + foreign_bucket integer not NULL, + foreign_index integer not NULL, + block_id text not NULL, + created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (block_id) REFERENCES blocks (block_id) +); + -- Debug Triggers CREATE TABLE transaction_pool_history ( diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 669eef4a8..5190f355f 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -26,7 +26,7 @@ use diesel::{ use log::*; use serde::{de::DeserializeOwned, Serialize}; use tari_common_types::types::FixedHash; -use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, ShardId}; +use tari_dan_common_types::{shard_bucket::ShardBucket, Epoch, NodeAddressable, NodeHeight, ShardId}; use tari_dan_storage::{ consensus_models::{ Block, @@ -580,6 +580,44 @@ impl StateStoreReadTransa block.try_convert(qc) } + fn blocks_get_foreign_ids( + &mut self, + bucket: ShardBucket, + from: u64, + to: u64, + ) -> Result>, StorageError> { + use crate::schema::{blocks, blocks_foreign_id_mapping, quorum_certificates}; + // TODO: how slow is this? is it worth splitting into 2 queries? + let results = blocks::table + .left_join(blocks_foreign_id_mapping::table.on(blocks::block_id.eq(blocks_foreign_id_mapping::block_id))) + .left_join(quorum_certificates::table.on(blocks::qc_id.eq(quorum_certificates::qc_id))) + .filter(blocks_foreign_id_mapping::foreign_bucket.eq(i64::from(bucket.as_u32()))) + .filter(blocks_foreign_id_mapping::foreign_index.ge(from as i64)) + .filter(blocks_foreign_id_mapping::foreign_index.le(to as i64)) + .select((blocks::all_columns, quorum_certificates::all_columns.nullable())) + .order_by(blocks_foreign_id_mapping::foreign_index.asc()) + .get_results::<(sql_models::Block, Option)>(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "blocks_all_after_height", + source: e, + })?; + + results + .into_iter() + .map(|(block, qc)| { + let qc = qc.ok_or_else(|| SqliteStorageError::DbInconsistency { + operation: "blocks_get_foreign_ids", + details: format!( + "block {} references non-existent quorum certificate {}", + block.block_id, block.qc_id + ), + })?; + + block.try_convert(qc) + }) + .collect() + } + fn blocks_get_tip(&mut self) -> Result, StorageError> { use crate::schema::{blocks, quorum_certificates}; diff --git a/dan_layer/state_store_sqlite/src/schema.rs b/dan_layer/state_store_sqlite/src/schema.rs index 9c2018717..8313204a6 100644 --- a/dan_layer/state_store_sqlite/src/schema.rs +++ b/dan_layer/state_store_sqlite/src/schema.rs @@ -58,6 +58,16 @@ diesel::table! { } } +diesel::table! { + blocks_foreign_id_mapping (id) { + id -> Integer, + foreign_bucket -> BigInt, + foreign_index -> BigInt, + block_id -> Text, + created_at -> Timestamp, + } +} + diesel::table! { last_executed (id) { id -> Integer, @@ -278,6 +288,7 @@ diesel::table! { diesel::allow_tables_to_appear_in_same_query!( blocks, + blocks_foreign_id_mapping, high_qcs, last_executed, last_proposed, diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index 59a259f8b..75f9c653c 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -176,7 +176,7 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit } fn blocks_insert(&mut self, block: &Block) -> Result<(), StorageError> { - use crate::schema::blocks; + use crate::schema::{blocks, blocks_foreign_id_mapping}; let insert = ( blocks::block_id.eq(serialize_hex(block.id())), @@ -201,6 +201,21 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit source: e, })?; + for (bucket, index) in block.get_foreign_indexes() { + let insert = ( + blocks_foreign_id_mapping::foreign_bucket.eq(i64::from(bucket.as_u32())), + blocks_foreign_id_mapping::foreign_index.eq(*index as i64), + blocks_foreign_id_mapping::block_id.eq(serialize_hex(block.id())), + ); + diesel::insert_into(blocks_foreign_id_mapping::table) + .values(insert) + .execute(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "blocks_insert", + source: e, + })?; + } + Ok(()) } diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index 49eaa1e73..c1909f224 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -9,7 +9,7 @@ use std::{ use serde::{Deserialize, Serialize}; use tari_common_types::types::FixedHash; -use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, ShardId}; +use tari_dan_common_types::{shard_bucket::ShardBucket, Epoch, NodeAddressable, NodeHeight, ShardId}; use tari_transaction::{Transaction, TransactionId}; use crate::{ @@ -116,6 +116,12 @@ pub trait StateStoreReadTransaction { asc_desc_created_at: Option, ) -> Result, StorageError>; fn blocks_get(&mut self, block_id: &BlockId) -> Result, StorageError>; + fn blocks_get_foreign_ids( + &mut self, + bucket: ShardBucket, + from: u64, + to: u64, + ) -> Result>, StorageError>; fn blocks_get_tip(&mut self) -> Result, StorageError>; fn blocks_get_all_between( &mut self, diff --git a/dan_layer/validator_node_rpc/proto/consensus.proto b/dan_layer/validator_node_rpc/proto/consensus.proto index 4117414fb..a5a564e1d 100644 --- a/dan_layer/validator_node_rpc/proto/consensus.proto +++ b/dan_layer/validator_node_rpc/proto/consensus.proto @@ -18,6 +18,7 @@ message HotStuffMessage { RequestedTransactionMessage requested_transaction = 6; SyncRequest sync_request = 7; SyncResponse sync_response = 8; + RequestMissingForeignBlocksMessage request_missing_foreign_blocks = 9; } } @@ -165,6 +166,12 @@ message DownState { uint64 fees_accrued = 2; } +message RequestMissingForeignBlocksMessage { + uint64 epoch = 1; + uint64 from = 2; + uint64 to = 3; +} + message RequestMissingTransactionsMessage { uint64 epoch = 1; bytes block_id = 2; diff --git a/dan_layer/validator_node_rpc/src/conversions/consensus.rs b/dan_layer/validator_node_rpc/src/conversions/consensus.rs index 06de07cc3..1dd4c6de8 100644 --- a/dan_layer/validator_node_rpc/src/conversions/consensus.rs +++ b/dan_layer/validator_node_rpc/src/conversions/consensus.rs @@ -30,6 +30,7 @@ use tari_consensus::messages::{ HotstuffMessage, NewViewMessage, ProposalMessage, + RequestMissingForeignBlocksMessage, RequestMissingTransactionsMessage, RequestedTransactionMessage, SyncRequestMessage, @@ -68,6 +69,9 @@ impl From<&HotstuffMessage> for proto::consensus: proto::consensus::hot_stuff_message::Message::ForeignProposal(msg.into()) }, HotstuffMessage::Vote(msg) => proto::consensus::hot_stuff_message::Message::Vote(msg.into()), + HotstuffMessage::RequestMissingForeignBlocks(msg) => { + proto::consensus::hot_stuff_message::Message::RequestMissingForeignBlocks(msg.into()) + }, HotstuffMessage::RequestMissingTransactions(msg) => { proto::consensus::hot_stuff_message::Message::RequestMissingTransactions(msg.into()) }, @@ -98,6 +102,9 @@ impl TryFrom { HotstuffMessage::RequestMissingTransactions(msg.try_into()?) }, + proto::consensus::hot_stuff_message::Message::RequestMissingForeignBlocks(msg) => { + HotstuffMessage::RequestMissingForeignBlocks(msg.try_into()?) + }, proto::consensus::hot_stuff_message::Message::RequestedTransaction(msg) => { HotstuffMessage::RequestedTransaction(msg.try_into()?) }, @@ -192,6 +199,29 @@ impl TryFrom for VoteMess } } +//---------------------------------- RequestMissingForeignBlocksMessage --------------------------------------------// +impl From<&RequestMissingForeignBlocksMessage> for proto::consensus::RequestMissingForeignBlocksMessage { + fn from(msg: &RequestMissingForeignBlocksMessage) -> Self { + Self { + epoch: msg.epoch.as_u64(), + from: msg.from, + to: msg.to, + } + } +} + +impl TryFrom for RequestMissingForeignBlocksMessage { + type Error = anyhow::Error; + + fn try_from(value: proto::consensus::RequestMissingForeignBlocksMessage) -> Result { + Ok(RequestMissingForeignBlocksMessage { + epoch: Epoch(value.epoch), + from: value.from, + to: value.to, + }) + } +} + //---------------------------------- RequestMissingTransactionsMessage --------------------------------------------// impl From<&RequestMissingTransactionsMessage> for proto::consensus::RequestMissingTransactionsMessage { fn from(msg: &RequestMissingTransactionsMessage) -> Self {