Skip to content

Commit

Permalink
fix(consensus)!: use temporary status updates for blocks > locked blo…
Browse files Browse the repository at this point in the history
…ck (#706)

Description
---
- Creates state updates per block per transaction as blocks are
processed to allow for forks.
- Changes sync protocol to send many smaller messages due to substate
data for full blocks exceeding 4Mb limit in RPC, preventing sync
- Only lock substate objects when a block has been justified (i.e. is a
new leaf block)

Motivation and Context
---
Up to two justified blocks can be forked out. Since we process blocks as
they come in to determine what commands to propose in the next block,
which require state changes, a fork can result in invalid new proposals.
Forks occur in non-malicious cases (stress test) due to high message
volumes which can prevent a node both from sending and receiving
proposal messages before leader timeout, though this is notably improved
after #681 and #693. After leader failure, new proposals with dummy
block parents supersede previously processed blocks. This PR tracks
state changes above the locked block and uses the correct transaction
phases/stages from the locked block and current leaf for new proposals.

Stress testing (up to 1000 transactions in a batch) largely went better
and the chain always continued.
Some issues were encountered when a node fell behind during stress tests
and switched to sync. Some nodes were left with already finalised
transactions in their pool (TBD BUG) and would re-propose them when it
was their turn to be leader. The block would not be voted on and leader
failure would result in the chain continuing. Fixing this bug will be a
focus for subsequent PRs.

How Has This Been Tested?
---
Existing consensus tests, manually with 8 VNs and stress testing,
existing cucumbers

What process can a PR reviewer use to test or verify this change?
---
Run a multi-node network

Breaking Changes
---

- [ ] None
- [x] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Oct 9, 2023
1 parent 0c84b25 commit d48c8f7
Show file tree
Hide file tree
Showing 61 changed files with 2,015 additions and 663 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ where TTemplateProvider: TemplateProvider<Template = LoadedTemplate>
let result = match processor.execute(transaction.clone()) {
Ok(result) => result,
Err(err) => ExecuteResult {
finalize: FinalizeResult::reject(tx_id, RejectReason::ExecutionFailure(err.to_string())),
finalize: FinalizeResult::new_rejectted(tx_id, RejectReason::ExecutionFailure(err.to_string())),
transaction_failure: None,
fee_receipt: None,
},
Expand Down
6 changes: 3 additions & 3 deletions applications/tari_dan_wallet_daemon/src/handlers/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub async fn handle_discover(

pub async fn handle_login_request(
context: &HandlerContext,
_: Option<String>,
_token: Option<String>,
auth_request: AuthLoginRequest,
) -> Result<AuthLoginResponse, anyhow::Error> {
let jwt = context.wallet_sdk().jwt_api();
Expand All @@ -42,7 +42,7 @@ pub async fn handle_login_request(

pub async fn handle_login_accept(
context: &HandlerContext,
_: Option<String>,
_token: Option<String>,
auth_accept_request: AuthLoginAcceptRequest,
) -> Result<AuthLoginAcceptResponse, anyhow::Error> {
let jwt = context.wallet_sdk().jwt_api();
Expand All @@ -52,7 +52,7 @@ pub async fn handle_login_accept(

pub async fn handle_login_deny(
context: &HandlerContext,
_: Option<String>,
_token: Option<String>,
auth_deny_request: AuthLoginDenyRequest,
) -> Result<AuthLoginDenyResponse, anyhow::Error> {
let jwt = context.wallet_sdk().jwt_api();
Expand Down
10 changes: 2 additions & 8 deletions applications/tari_signaling_server/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,7 @@ impl Data {
}

pub fn add_offer_ice_candidate(&mut self, id: u64, ice_candidate: RTCIceCandidateInit) {
self.offer_ice_candidates
.entry(id)
.or_insert_with(Vec::new)
.push(ice_candidate);
self.offer_ice_candidates.entry(id).or_default().push(ice_candidate);
}

pub fn get_offer_ice_candidates(&self, id: u64) -> Result<&Vec<RTCIceCandidateInit>> {
Expand All @@ -100,10 +97,7 @@ impl Data {
}

pub fn add_answer_ice_candidate(&mut self, id: u64, ice_candidate: RTCIceCandidateInit) {
self.answer_ice_candidates
.entry(id)
.or_insert_with(Vec::new)
.push(ice_candidate);
self.answer_ice_candidates.entry(id).or_default().push(ice_candidate);
}

pub fn get_answer_ice_candidates(&self, id: u64) -> Result<&Vec<RTCIceCandidateInit>> {
Expand Down
42 changes: 21 additions & 21 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use tari_common::{
};
use tari_common_types::types::PublicKey;
use tari_comms::{protocol::rpc::RpcServer, types::CommsPublicKey, CommsNode, NodeIdentity, UnspawnedCommsNode};
use tari_consensus::hotstuff::HotstuffEvent;
use tari_dan_app_utilities::{
base_layer_scanner,
consensus_constants::ConsensusConstants,
Expand Down Expand Up @@ -76,8 +75,8 @@ use tokio::{sync::mpsc, task::JoinHandle};
use crate::{
comms,
consensus,
consensus::ConsensusHandle,
dry_run_transaction_processor::DryRunTransactionProcessor,
event_subscription::EventSubscription,
p2p::{
create_tari_validator_node_rpc_service,
services::{
Expand Down Expand Up @@ -194,9 +193,23 @@ pub async fn spawn_services(

let validator_node_client_factory = TariCommsValidatorNodeClientFactory::new(comms.connectivity());

// Consensus
let (tx_executed_transaction, rx_executed_transaction) = mpsc::channel(10);
let (consensus_join_handle, consensus_handle, rx_consensus_to_mempool) = consensus::spawn(
state_store.clone(),
node_identity.clone(),
epoch_manager.clone(),
rx_executed_transaction,
rx_consensus_message,
outbound_messaging.clone(),
validator_node_client_factory.clone(),
shutdown.clone(),
)
.await;
handles.push(consensus_join_handle);

// Mempool
let virtual_substate_manager = VirtualSubstateManager::new(state_store.clone(), epoch_manager.clone());
let (tx_executed_transaction, rx_executed_transaction) = mpsc::channel(10);
let scanner = SubstateScanner::new(epoch_manager.clone(), validator_node_client_factory.clone());
let substate_resolver = TariSubstateResolver::new(
state_store.clone(),
Expand All @@ -206,7 +219,7 @@ pub async fn spawn_services(
);
let (mempool, join_handle) = mempool::spawn(
rx_new_transaction_message,
outbound_messaging.clone(),
outbound_messaging,
tx_executed_transaction,
epoch_manager.clone(),
node_identity.clone(),
Expand All @@ -219,6 +232,8 @@ pub async fn spawn_services(
),
create_mempool_after_execute_validator(state_store.clone()),
state_store.clone(),
rx_consensus_to_mempool,
consensus_handle.clone(),
);
handles.push(join_handle);

Expand All @@ -236,21 +251,6 @@ pub async fn spawn_services(
);
handles.push(join_handle);

// Consensus
let (consensus_handle, consensus_events) = consensus::spawn(
state_store.clone(),
node_identity.clone(),
epoch_manager.clone(),
rx_executed_transaction,
rx_consensus_message,
outbound_messaging,
mempool.clone(),
validator_node_client_factory.clone(),
shutdown.clone(),
)
.await;
handles.push(consensus_handle);

let comms = setup_p2p_rpc(
config,
comms,
Expand Down Expand Up @@ -289,7 +289,7 @@ pub async fn spawn_services(
mempool,
epoch_manager,
template_manager: template_manager_service,
hotstuff_events: consensus_events,
consensus_handle,
global_db,
state_store,
dry_run_transaction_processor,
Expand Down Expand Up @@ -321,7 +321,7 @@ pub struct Services {
pub mempool: MempoolHandle,
pub epoch_manager: EpochManagerHandle,
pub template_manager: TemplateManagerHandle,
pub hotstuff_events: EventSubscription<HotstuffEvent>,
pub consensus_handle: ConsensusHandle,
pub global_db: GlobalDb<SqliteGlobalDbAdapter>,
pub dry_run_transaction_processor: DryRunTransactionProcessor,
pub validator_node_client_factory: TariCommsValidatorNodeClientFactory,
Expand Down
27 changes: 15 additions & 12 deletions applications/tari_validator_node/src/comms/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,19 @@ where
let decoded_msg = proto::network::DanMessage::decode(&mut body)?;
let message_tag = decoded_msg.message_tag.clone();
let msg = DanMessage::try_from(decoded_msg)?;
let peer = peer_manager
.find_by_node_id(&source_peer)
.await?
.ok_or_else(|| anyhow::anyhow!("Could not find peer with node id {}", source_peer))?;

log::info!(
target: LOG_TARGET,
"📨 Rx: {} ({} bytes) from {}",
"📨 Rx: {} ({} bytes) from {:15}",
msg.as_type_str(),
body_len,
source_peer
peer.public_key
);
let peer = peer_manager
.find_by_node_id(&source_peer)
.await?
.ok_or_else(|| anyhow::anyhow!("Could not find peer with node id {}", source_peer))?;

logger.log_inbound_message(peer.public_key.as_bytes(), msg.as_type_str(), &message_tag, &msg);
let mut svc = next_service.ready_oneshot().await?;
svc.call((peer.public_key, msg)).await?;
Expand Down Expand Up @@ -154,17 +156,18 @@ where
let body_len = body.len();
let decoded_msg = proto::consensus::HotStuffMessage::decode(&mut body)?;
let msg = HotstuffMessage::try_from(decoded_msg)?;
let peer = peer_manager
.find_by_node_id(&source_peer)
.await?
.ok_or_else(|| anyhow::anyhow!("Could not find peer with node id {}", source_peer))?;
log::info!(
target: LOG_TARGET,
"📨 Rx: {} ({} bytes) from {}",
"📨 Rx: {} ({} bytes) from {:15}",
msg.as_type_str(),
body_len,
source_peer
peer.public_key
);
let peer = peer_manager
.find_by_node_id(&source_peer)
.await?
.ok_or_else(|| anyhow::anyhow!("Could not find peer with node id {}", source_peer))?;

logger.log_inbound_message(peer.public_key.as_bytes(), msg.as_type_str(), "", &msg);
let mut svc = next_service.ready_oneshot().await?;
svc.call((peer.public_key, msg)).await?;
Expand Down
33 changes: 33 additions & 0 deletions applications/tari_validator_node/src/consensus/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tari_consensus::hotstuff::{ConsensusCurrentState, HotstuffEvent};
use tokio::sync::{broadcast, watch};

use crate::event_subscription::EventSubscription;

#[derive(Debug, Clone)]
pub struct ConsensusHandle {
rx_current_state: watch::Receiver<ConsensusCurrentState>,
events_subscription: EventSubscription<HotstuffEvent>,
}

impl ConsensusHandle {
pub(super) fn new(
rx_current_state: watch::Receiver<ConsensusCurrentState>,
events_subscription: EventSubscription<HotstuffEvent>,
) -> Self {
Self {
rx_current_state,
events_subscription,
}
}

pub fn subscribe_to_hotstuff_events(&mut self) -> broadcast::Receiver<HotstuffEvent> {
self.events_subscription.subscribe()
}

pub fn get_current_state(&self) -> ConsensusCurrentState {
*self.rx_current_state.borrow()
}
}
36 changes: 21 additions & 15 deletions applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tari_common_types::types::PublicKey;
use tari_comms::{types::CommsPublicKey, NodeIdentity};
use tari_comms_rpc_state_sync::CommsRpcStateSyncManager;
use tari_consensus::{
hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffEvent, HotstuffWorker},
hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffWorker},
messages::HotstuffMessage,
};
use tari_dan_common_types::committee::Committee;
Expand All @@ -19,7 +19,7 @@ use tari_state_store_sqlite::SqliteStateStore;
use tari_transaction::{Transaction, TransactionId};
use tari_validator_node_rpc::client::TariCommsValidatorNodeClientFactory;
use tokio::{
sync::{broadcast, mpsc},
sync::{broadcast, mpsc, watch},
task::JoinHandle,
};

Expand All @@ -31,25 +31,31 @@ use crate::{
state_manager::TariStateManager,
},
event_subscription::EventSubscription,
p2p::services::{mempool::MempoolHandle, messaging::OutboundMessaging},
p2p::services::messaging::OutboundMessaging,
};

mod handle;
mod leader_selection;
mod signature_service;
mod spec;
mod state_manager;

pub use handle::*;

pub async fn spawn(
store: SqliteStateStore<PublicKey>,
node_identity: Arc<NodeIdentity>,
epoch_manager: EpochManagerHandle,
rx_new_transactions: mpsc::Receiver<TransactionId>,
rx_hs_message: mpsc::Receiver<(CommsPublicKey, HotstuffMessage<PublicKey>)>,
outbound_messaging: OutboundMessaging,
mempool: MempoolHandle,
client_factory: TariCommsValidatorNodeClientFactory,
shutdown_signal: ShutdownSignal,
) -> (JoinHandle<Result<(), anyhow::Error>>, EventSubscription<HotstuffEvent>) {
) -> (
JoinHandle<Result<(), anyhow::Error>>,
ConsensusHandle,
mpsc::UnboundedReceiver<Transaction>,
) {
let (tx_broadcast, rx_broadcast) = mpsc::channel(10);
let (tx_leader, rx_leader) = mpsc::channel(10);
let (tx_mempool, rx_mempool) = mpsc::unbounded_channel();
Expand All @@ -59,7 +65,7 @@ pub async fn spawn(
let leader_strategy = RoundRobinLeaderStrategy::new();
let transaction_pool = TransactionPool::new();
let state_manager = TariStateManager::new();
let (tx_events, _) = broadcast::channel(100);
let (tx_hotstuff_events, _) = broadcast::channel(100);

let epoch_events = epoch_manager.subscribe().await.unwrap();

Expand All @@ -75,38 +81,40 @@ pub async fn spawn(
transaction_pool,
tx_broadcast,
tx_leader,
tx_events.clone(),
tx_hotstuff_events.clone(),
tx_mempool,
shutdown_signal.clone(),
);

let (tx_current_state, rx_current_state) = watch::channel(Default::default());
let context = ConsensusWorkerContext {
epoch_manager: epoch_manager.clone(),
epoch_events,
hotstuff: hotstuff_worker,
state_sync: CommsRpcStateSyncManager::new(epoch_manager, store, client_factory),
tx_current_state,
};

let handle = ConsensusWorker::new(shutdown_signal).spawn(context);

ConsensusMessageWorker {
rx_broadcast,
rx_leader,
rx_mempool,
outbound_messaging,
mempool,
}
.spawn();

(handle, EventSubscription::new(tx_events))
(
handle,
ConsensusHandle::new(rx_current_state, EventSubscription::new(tx_hotstuff_events)),
rx_mempool,
)
}

struct ConsensusMessageWorker {
rx_broadcast: mpsc::Receiver<(Committee<CommsPublicKey>, HotstuffMessage<CommsPublicKey>)>,
rx_leader: mpsc::Receiver<(CommsPublicKey, HotstuffMessage<CommsPublicKey>)>,
rx_mempool: mpsc::UnboundedReceiver<Transaction>,
outbound_messaging: OutboundMessaging,
mempool: MempoolHandle,
}

impl ConsensusMessageWorker {
Expand All @@ -126,9 +134,7 @@ impl ConsensusMessageWorker {
.await
.ok();
},
Some(tx) = self.rx_mempool.recv() => {
self.mempool.submit_transaction_without_propagating(tx).await.ok();
},

else => break,
}
}
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_validator_node/src/dan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl DanNode {
}

pub async fn start(mut self, mut shutdown: ShutdownSignal) -> Result<(), anyhow::Error> {
let mut hotstuff_events = self.services.hotstuff_events.subscribe();
let mut hotstuff_events = self.services.consensus_handle.subscribe_to_hotstuff_events();

let mut connectivity_events = self.services.comms.connectivity().get_event_subscription();

Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/src/event_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tokio::sync::broadcast;
///
/// We hold a sender because if we held a receiver then the broadcast buffer would always fill up because the receiver
/// isn't reading off of it.
#[derive(Debug)]
pub struct EventSubscription<T>(broadcast::Sender<T>);

impl<T> EventSubscription<T> {
Expand Down
Loading

0 comments on commit d48c8f7

Please sign in to comment.