Skip to content

Commit

Permalink
feat: request foreign blocks (#760)
Browse files Browse the repository at this point in the history
Description
---
When VN receives foreign proposal that was not expected, it requests all
the missing proposals. They are send in order. So the node can process
them as they come.

Motivation and Context
---

How Has This Been Tested?
---
Not tested.

What process can a PR reviewer use to test or verify this change?
---
You can change the code to VN to not receive foreign proposal and then
request it again later when another foreign proposal comes in.


Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
Cifko authored Dec 11, 2023
1 parent a0a0f1f commit 7a59c4d
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 10 deletions.
2 changes: 2 additions & 0 deletions dan_layer/consensus/src/hotstuff/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,6 @@ pub enum ProposalValidationError {
},
#[error("Proposed block {block_id} {height} already has been processed")]
BlockAlreadyProcessed { block_id: BlockId, height: NodeHeight },
#[error("Internal channel send error when {context}")]
InternalChannelClosed { context: &'static str },
}
1 change: 1 addition & 0 deletions dan_layer/consensus/src/hotstuff/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod on_ready_to_vote_on_local_block;
mod on_receive_foreign_proposal;
mod on_receive_local_proposal;
mod on_receive_new_view;
mod on_receive_request_missing_foreign_blocks;
mod on_receive_request_missing_transactions;
mod on_receive_requested_transactions;
mod on_receive_vote;
Expand Down
29 changes: 24 additions & 5 deletions dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use tari_dan_storage::{
StateStore,
};
use tari_epoch_manager::EpochManagerReader;
use tokio::sync::mpsc;

use crate::{
hotstuff::{error::HotStuffError, pacemaker_handle::PaceMakerHandle, ProposalValidationError},
messages::ProposalMessage,
messages::{HotstuffMessage, ProposalMessage, RequestMissingForeignBlocksMessage},
traits::ConsensusSpec,
};

Expand All @@ -31,6 +32,7 @@ pub struct OnReceiveForeignProposalHandler<TConsensusSpec: ConsensusSpec> {
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
pacemaker: PaceMakerHandle,
foreign_receive_counter: ForeignReceiveCounters,
tx_leader: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage<TConsensusSpec::Addr>)>,
}

impl<TConsensusSpec> OnReceiveForeignProposalHandler<TConsensusSpec>
Expand All @@ -42,13 +44,15 @@ where TConsensusSpec: ConsensusSpec
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
pacemaker: PaceMakerHandle,
foreign_receive_counter: ForeignReceiveCounters,
tx_leader: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage<TConsensusSpec::Addr>)>,
) -> Self {
Self {
store,
epoch_manager,
transaction_pool,
pacemaker,
foreign_receive_counter,
tx_leader,
}
}

Expand All @@ -74,7 +78,8 @@ where TConsensusSpec: ConsensusSpec
.get_committee_shard(block.epoch(), vn.shard_key)
.await?;
let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket())?;
self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket())
.await?;
// Is this ok? Can foreign node send invalid block that should still increment the counter?
self.foreign_receive_counter.increment(&committee_shard.bucket());
self.store.with_write_tx(|tx| {
Expand Down Expand Up @@ -153,7 +158,7 @@ where TConsensusSpec: ConsensusSpec
Ok(())
}

fn validate_proposed_block(
async fn validate_proposed_block(
&self,
from: &TConsensusSpec::Addr,
candidate_block: &Block<TConsensusSpec::Addr>,
Expand All @@ -172,8 +177,22 @@ where TConsensusSpec: ConsensusSpec
};
let current_index = self.foreign_receive_counter.get_index(&foreign_bucket);
if current_index + 1 != incoming_index {
debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was
{incoming_index}", expected_index = current_index + 1);
debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was {incoming_index}", expected_index = current_index + 1);
if current_index < incoming_index {
self.tx_leader
.send((
from.clone(),
HotstuffMessage::RequestMissingForeignBlocks(RequestMissingForeignBlocksMessage {
epoch: candidate_block.epoch(),
from: current_index + 1,
to: incoming_index,
}),
))
.await
.map_err(|_| ProposalValidationError::InternalChannelClosed {
context: "tx_leader in OnNextSyncViewHandler::send_to_leader",
})?;
}
return Err(ProposalValidationError::InvalidForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use log::*;
use tari_dan_storage::{StateStore, StateStoreReadTransaction};
use tari_epoch_manager::EpochManagerReader;
use tokio::sync::mpsc;

use crate::{
hotstuff::error::HotStuffError,
messages::{HotstuffMessage, ProposalMessage, RequestMissingForeignBlocksMessage},
traits::ConsensusSpec,
};

const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::on_receive_request_missing_transactions";

pub struct OnReceiveRequestMissingForeignBlocksHandler<TConsensusSpec: ConsensusSpec> {
store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
tx_request_missing_foreign_blocks: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage<TConsensusSpec::Addr>)>,
}

impl<TConsensusSpec> OnReceiveRequestMissingForeignBlocksHandler<TConsensusSpec>
where TConsensusSpec: ConsensusSpec
{
pub fn new(
store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
tx_request_missing_foreign_blocks: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage<TConsensusSpec::Addr>)>,
) -> Self {
Self {
store,
epoch_manager,
tx_request_missing_foreign_blocks,
}
}

pub async fn handle(
&mut self,
from: TConsensusSpec::Addr,
msg: RequestMissingForeignBlocksMessage,
) -> Result<(), HotStuffError> {
debug!(target: LOG_TARGET, "{} is requesting {}..{} missing blocks from epoch {}", from, msg.from, msg.to, msg.epoch);
let foreign_shard = self
.epoch_manager
.get_committee_shard_by_validator_address(msg.epoch, &from)
.await?;
let missing_blocks = self
.store
.with_read_tx(|tx| tx.blocks_get_foreign_ids(foreign_shard.bucket(), msg.from, msg.to))?;
for block in missing_blocks {
// We send the proposal back to the requester via hotstuff, so they follow the normal path including
// validation.
self.tx_request_missing_foreign_blocks
.send((
from.clone(),
HotstuffMessage::ForeignProposal(ProposalMessage { block: block.clone() }),
))
.await
.map_err(|_| HotStuffError::InternalChannelClosed {
context: "tx_leader in OnReceiveRequestMissingForeignBlocksHandler::handle",
})?;
}
Ok(())
}
}
17 changes: 16 additions & 1 deletion dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ use tari_shutdown::ShutdownSignal;
use tari_transaction::{Transaction, TransactionId};
use tokio::sync::{broadcast, mpsc};

use super::{on_receive_requested_transactions::OnReceiveRequestedTransactions, proposer::Proposer};
use super::{
on_receive_request_missing_foreign_blocks::OnReceiveRequestMissingForeignBlocksHandler,
on_receive_requested_transactions::OnReceiveRequestedTransactions,
proposer::Proposer,
};
use crate::{
hotstuff::{
common::CommitteeAndMessage,
Expand Down Expand Up @@ -56,6 +60,7 @@ pub struct HotstuffWorker<TConsensusSpec: ConsensusSpec> {
on_receive_foreign_proposal: OnReceiveForeignProposalHandler<TConsensusSpec>,
on_receive_vote: OnReceiveVoteHandler<TConsensusSpec>,
on_receive_new_view: OnReceiveNewViewHandler<TConsensusSpec>,
on_receive_request_missing_foreign_blocks: OnReceiveRequestMissingForeignBlocksHandler<TConsensusSpec>,
on_receive_request_missing_txs: OnReceiveRequestMissingTransactions<TConsensusSpec>,
on_receive_requested_txs: OnReceiveRequestedTransactions<TConsensusSpec>,
on_propose: OnPropose<TConsensusSpec>,
Expand Down Expand Up @@ -139,6 +144,7 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
transaction_pool.clone(),
pacemaker.clone_handle(),
foreign_receive_counter,
tx_leader.clone(),
),
on_receive_vote: OnReceiveVoteHandler::new(vote_receiver.clone()),
on_receive_new_view: OnReceiveNewViewHandler::new(
Expand All @@ -148,6 +154,11 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
pacemaker.clone_handle(),
vote_receiver,
),
on_receive_request_missing_foreign_blocks: OnReceiveRequestMissingForeignBlocksHandler::new(
state_store.clone(),
epoch_manager.clone(),
tx_leader.clone(),
),
on_receive_request_missing_txs: OnReceiveRequestMissingTransactions::new(
state_store.clone(),
tx_leader.clone(),
Expand Down Expand Up @@ -474,6 +485,10 @@ where TConsensusSpec: ConsensusSpec
);
Ok(())
},
HotstuffMessage::RequestMissingForeignBlocks(msg) => log_err(
"on_receive_request_missing_foreign_blocks",
self.on_receive_request_missing_foreign_blocks.handle(from, msg).await,
),
}
}

Expand Down
14 changes: 13 additions & 1 deletion dan_layer/consensus/src/messages/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ use std::fmt::Display;
use serde::Serialize;
use tari_dan_common_types::Epoch;

use super::{NewViewMessage, ProposalMessage, RequestedTransactionMessage, VoteMessage};
use super::{
NewViewMessage,
ProposalMessage,
RequestMissingForeignBlocksMessage,
RequestedTransactionMessage,
VoteMessage,
};
use crate::messages::{RequestMissingTransactionsMessage, SyncRequestMessage, SyncResponseMessage};

// Serialize is implemented for the message logger
Expand All @@ -20,6 +26,7 @@ pub enum HotstuffMessage<TAddr> {
RequestedTransaction(RequestedTransactionMessage),
SyncRequest(SyncRequestMessage),
SyncResponse(SyncResponseMessage<TAddr>),
RequestMissingForeignBlocks(RequestMissingForeignBlocksMessage),
}

impl<TAddr> HotstuffMessage<TAddr> {
Expand All @@ -33,6 +40,7 @@ impl<TAddr> HotstuffMessage<TAddr> {
HotstuffMessage::RequestedTransaction(_) => "RequestedTransaction",
HotstuffMessage::SyncRequest(_) => "SyncRequest",
HotstuffMessage::SyncResponse(_) => "SyncResponse",
HotstuffMessage::RequestMissingForeignBlocks(_) => "RequestMissingForeignBlocks",
}
}

Expand All @@ -46,6 +54,7 @@ impl<TAddr> HotstuffMessage<TAddr> {
Self::RequestedTransaction(msg) => msg.epoch,
Self::SyncRequest(msg) => msg.epoch,
Self::SyncResponse(msg) => msg.epoch,
Self::RequestMissingForeignBlocks(msg) => msg.epoch,
}
}

Expand All @@ -70,6 +79,9 @@ impl<TAddr> Display for HotstuffMessage<TAddr> {
HotstuffMessage::RequestedTransaction(msg) => write!(f, "RequestedTransaction({})", msg.transactions.len()),
HotstuffMessage::SyncRequest(msg) => write!(f, "SyncRequest({})", msg.high_qc),
HotstuffMessage::SyncResponse(msg) => write!(f, "SyncResponse({} block(s))", msg.blocks.len()),
HotstuffMessage::RequestMissingForeignBlocks(msg) => {
write!(f, "RequestMissingForeignBlocks({}..{})", msg.from, msg.to)
},
}
}
}
3 changes: 3 additions & 0 deletions dan_layer/consensus/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub use proposal::*;
mod vote;
pub use vote::*;

mod request_missing_foreign_blocks;
pub use request_missing_foreign_blocks::*;

mod request_missing_transaction;
pub use request_missing_transaction::*;

Expand Down
12 changes: 12 additions & 0 deletions dan_layer/consensus/src/messages/request_missing_foreign_blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use serde::Serialize;
use tari_dan_common_types::Epoch;

#[derive(Debug, Clone, Serialize)]
pub struct RequestMissingForeignBlocksMessage {
pub epoch: Epoch,
pub from: u64,
pub to: u64,
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ CREATE TABLE foreign_receive_counters
created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE blocks_foreign_id_mapping
(
id integer not NULL primary key AUTOINCREMENT,
foreign_bucket integer not NULL,
foreign_index integer not NULL,
block_id text not NULL,
created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (block_id) REFERENCES blocks (block_id)
);

-- Debug Triggers
CREATE TABLE transaction_pool_history
(
Expand Down
40 changes: 39 additions & 1 deletion dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use diesel::{
use log::*;
use serde::{de::DeserializeOwned, Serialize};
use tari_common_types::types::FixedHash;
use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, ShardId};
use tari_dan_common_types::{shard_bucket::ShardBucket, Epoch, NodeAddressable, NodeHeight, ShardId};
use tari_dan_storage::{
consensus_models::{
Block,
Expand Down Expand Up @@ -580,6 +580,44 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
block.try_convert(qc)
}

fn blocks_get_foreign_ids(
&mut self,
bucket: ShardBucket,
from: u64,
to: u64,
) -> Result<Vec<Block<TAddr>>, StorageError> {
use crate::schema::{blocks, blocks_foreign_id_mapping, quorum_certificates};
// TODO: how slow is this? is it worth splitting into 2 queries?
let results = blocks::table
.left_join(blocks_foreign_id_mapping::table.on(blocks::block_id.eq(blocks_foreign_id_mapping::block_id)))
.left_join(quorum_certificates::table.on(blocks::qc_id.eq(quorum_certificates::qc_id)))
.filter(blocks_foreign_id_mapping::foreign_bucket.eq(i64::from(bucket.as_u32())))
.filter(blocks_foreign_id_mapping::foreign_index.ge(from as i64))
.filter(blocks_foreign_id_mapping::foreign_index.le(to as i64))
.select((blocks::all_columns, quorum_certificates::all_columns.nullable()))
.order_by(blocks_foreign_id_mapping::foreign_index.asc())
.get_results::<(sql_models::Block, Option<sql_models::QuorumCertificate>)>(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
operation: "blocks_all_after_height",
source: e,
})?;

results
.into_iter()
.map(|(block, qc)| {
let qc = qc.ok_or_else(|| SqliteStorageError::DbInconsistency {
operation: "blocks_get_foreign_ids",
details: format!(
"block {} references non-existent quorum certificate {}",
block.block_id, block.qc_id
),
})?;

block.try_convert(qc)
})
.collect()
}

fn blocks_get_tip(&mut self) -> Result<Block<TAddr>, StorageError> {
use crate::schema::{blocks, quorum_certificates};

Expand Down
11 changes: 11 additions & 0 deletions dan_layer/state_store_sqlite/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ diesel::table! {
}
}

diesel::table! {
blocks_foreign_id_mapping (id) {
id -> Integer,
foreign_bucket -> BigInt,
foreign_index -> BigInt,
block_id -> Text,
created_at -> Timestamp,
}
}

diesel::table! {
last_executed (id) {
id -> Integer,
Expand Down Expand Up @@ -278,6 +288,7 @@ diesel::table! {

diesel::allow_tables_to_appear_in_same_query!(
blocks,
blocks_foreign_id_mapping,
high_qcs,
last_executed,
last_proposed,
Expand Down
Loading

0 comments on commit 7a59c4d

Please sign in to comment.