diff --git a/consensus/src/aggregator.rs b/consensus/src/aggregator.rs index 639ba91f52..b31b3f31b7 100644 --- a/consensus/src/aggregator.rs +++ b/consensus/src/aggregator.rs @@ -242,7 +242,7 @@ mod tests { }; use hex::FromHex; use node_data::ledger::{Header, Seed}; - use std::collections::HashMap; + use std::time::Duration; impl Aggregator { pub fn get_total(&self, step: u8, vote: Vote) -> Option { @@ -301,7 +301,7 @@ mod tests { pubkey_bls, secret_key, &tip_header, - HashMap::new(), + Duration::default(), vec![], ); diff --git a/consensus/src/commons.rs b/consensus/src/commons.rs index 7452b86874..670a660999 100644 --- a/consensus/src/commons.rs +++ b/consensus/src/commons.rs @@ -8,19 +8,15 @@ // Provisioners, the BidList, the Seed and the Hash. use node_data::ledger::*; -use std::collections::HashMap; use std::time::Duration; use execution_core::signatures::bls::SecretKey as BlsSecretKey; use node_data::bls::PublicKey; use node_data::message::{AsyncQueue, Message, Payload}; -use node_data::StepName; use crate::operations::Voter; -pub type TimeoutSet = HashMap; - #[derive(Clone, Default, Debug)] pub struct RoundUpdate { // Current round number of the ongoing consensus @@ -36,7 +32,7 @@ pub struct RoundUpdate { att_voters: Vec, timestamp: u64, - pub base_timeouts: TimeoutSet, + pub base_timeout: Duration, } impl RoundUpdate { @@ -44,7 +40,7 @@ impl RoundUpdate { pubkey_bls: PublicKey, secret_key: BlsSecretKey, tip_header: &Header, - base_timeouts: TimeoutSet, + base_timeout: Duration, att_voters: Vec, ) -> Self { let round = tip_header.height + 1; @@ -56,7 +52,7 @@ impl RoundUpdate { hash: tip_header.hash, seed: tip_header.seed, timestamp: tip_header.timestamp, - base_timeouts, + base_timeout, att_voters, } } diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index d86cd0f890..b8532f2610 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -182,7 +182,7 @@ impl Consensus { validation_handler, ratification_handler, proposal_handler, - ru.base_timeouts.clone(), + ru.base_timeout, ); let (prev_block_hash, saved_iter) = diff --git a/consensus/src/errors.rs b/consensus/src/errors.rs index 9172fdd8f0..922f96fae5 100644 --- a/consensus/src/errors.rs +++ b/consensus/src/errors.rs @@ -76,8 +76,6 @@ pub enum OperationError { InvalidEST(anyhow::Error), #[error("failed to verify header {0}")] InvalidHeader(HeaderError), - #[error("Unable to update metrics {0}")] - MetricsUpdate(anyhow::Error), #[error("Invalid Iteration Info {0}")] InvalidIterationInfo(io::Error), #[error("Invalid Faults {0}")] diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index fdbac441e8..d06d0b0b5f 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -139,7 +139,7 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { info!(event = "run event_loop", ?dur, mode = "open_consensus",); dur } else { - let dur = self.iter_ctx.get_timeout(self.step_name()); + let dur = self.iter_ctx.get_timeout(); debug!(event = "run event_loop", ?dur); dur }; @@ -186,7 +186,6 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { // block is accepted continue; } else { - self.report_elapsed_time().await; return Ok(step_result); } } @@ -470,8 +469,6 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { &mut self, phase: Arc>, ) { - self.iter_ctx.on_timeout_event(self.step_name()); - if let Some(msg) = phase .lock() .await @@ -536,24 +533,6 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> { None } - /// Reports step elapsed time to the client - async fn report_elapsed_time(&mut self) { - let elapsed = self - .step_start_time - .take() - .expect("valid start time") - .elapsed(); - - let _ = self - .client - .add_step_elapsed_time( - self.round_update.round, - self.step_name(), - elapsed, - ) - .await; - } - pub(crate) fn get_curr_generator(&self) -> Option { self.iter_ctx.get_generator(self.iteration) } diff --git a/consensus/src/iteration_ctx.rs b/consensus/src/iteration_ctx.rs index 1a39fa1821..5835cf5c0b 100644 --- a/consensus/src/iteration_ctx.rs +++ b/consensus/src/iteration_ctx.rs @@ -4,11 +4,11 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. -use crate::commons::{Database, RoundUpdate, TimeoutSet}; -use std::cmp; +use crate::commons::{Database, RoundUpdate}; use crate::config::{ - exclude_next_generator, MAX_STEP_TIMEOUT, TIMEOUT_INCREASE, + exclude_next_generator, MAX_STEP_TIMEOUT, MIN_STEP_TIMEOUT, + TIMEOUT_INCREASE, }; use crate::msg_handler::HandleMsgOutput; use crate::msg_handler::MsgHandler; @@ -23,7 +23,6 @@ use node_data::bls::PublicKeyBytes; use node_data::ledger::Seed; use node_data::message::Message; use std::collections::HashMap; -use std::ops::Add; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; @@ -79,8 +78,8 @@ pub struct IterationCtx { /// iteration of current round pub(crate) committees: RoundCommittees, - /// Implements the adaptive timeout algorithm - timeouts: TimeoutSet, + /// Timeout for each step of the current iteration + step_timeout: Duration, } impl IterationCtx { @@ -92,7 +91,7 @@ impl IterationCtx { Mutex, >, proposal_handler: Arc>>, - timeouts: TimeoutSet, + base_step_timeout: Duration, ) -> Self { Self { round, @@ -101,7 +100,7 @@ impl IterationCtx { validation_handler, ratification_handler, committees: Default::default(), - timeouts, + step_timeout: base_step_timeout, proposal_handler, } } @@ -109,6 +108,15 @@ impl IterationCtx { /// Executed on starting a new iteration, before Proposal step execution pub(crate) fn on_begin(&mut self, iter: u8) { self.iter = iter; + + if iter > 0 { + self.step_timeout += TIMEOUT_INCREASE; + } + + self.step_timeout = self + .step_timeout + .max(MIN_STEP_TIMEOUT) + .min(MAX_STEP_TIMEOUT); } /// Executed on closing an iteration, after Ratification step execution @@ -122,21 +130,9 @@ impl IterationCtx { self.join_set.abort_all(); } - /// Handles an event of a Phase timeout - pub(crate) fn on_timeout_event(&mut self, step_name: StepName) { - let curr_step_timeout = - self.timeouts.get_mut(&step_name).expect("valid timeout"); - - *curr_step_timeout = - cmp::min(MAX_STEP_TIMEOUT, curr_step_timeout.add(TIMEOUT_INCREASE)); - } - - /// Calculates and returns the adjusted timeout for the specified step - pub(crate) fn get_timeout(&self, step_name: StepName) -> Duration { - *self - .timeouts - .get(&step_name) - .expect("valid timeout per step") + /// Return the step timeout for the current iteration + pub(crate) fn get_timeout(&self) -> Duration { + self.step_timeout } fn get_sortition_config( diff --git a/consensus/src/operations.rs b/consensus/src/operations.rs index b6f0351f51..3785ce4d66 100644 --- a/consensus/src/operations.rs +++ b/consensus/src/operations.rs @@ -5,14 +5,12 @@ // Copyright (c) DUSK NETWORK. All rights reserved. use std::fmt; -use std::time::Duration; use node_data::bls::PublicKey; use node_data::bls::PublicKeyBytes; use node_data::ledger::{ Block, Fault, Header, Slash, SpentTransaction, Transaction, }; -use node_data::StepName; use crate::errors::*; @@ -87,12 +85,5 @@ pub trait Operations: Send + Sync { params: CallParams, ) -> Result; - async fn add_step_elapsed_time( - &self, - round: u64, - step_name: StepName, - elapsed: Duration, - ) -> Result<(), OperationError>; - async fn get_block_gas_limit(&self) -> u64; } diff --git a/consensus/src/phase.rs b/consensus/src/phase.rs index 0e2508adec..75833399b9 100644 --- a/consensus/src/phase.rs +++ b/consensus/src/phase.rs @@ -57,7 +57,7 @@ impl Phase { ) -> Result { ctx.set_start_time(); - let timeout = ctx.iter_ctx.get_timeout(ctx.step_name()); + let timeout = ctx.iter_ctx.get_timeout(); debug!(event = "execute_step", ?timeout); // Execute step diff --git a/node/benches/accept.rs b/node/benches/accept.rs index d8d013657c..5742997c26 100644 --- a/node/benches/accept.rs +++ b/node/benches/accept.rs @@ -68,7 +68,7 @@ fn create_step_votes( pk.clone(), sk.clone(), tip_header, - HashMap::default(), + Duration::default(), vec![], ); let sig = match step { diff --git a/node/src/chain/acceptor.rs b/node/src/chain/acceptor.rs index 8a91067ae3..b1f5f7bae5 100644 --- a/node/src/chain/acceptor.rs +++ b/node/src/chain/acceptor.rs @@ -7,7 +7,6 @@ use crate::database::{self, Candidate, Ledger, Mempool, Metadata}; use crate::{vm, Message, Network}; use anyhow::{anyhow, Result}; -use dusk_consensus::commons::TimeoutSet; use dusk_consensus::config::{MAX_STEP_TIMEOUT, MIN_STEP_TIMEOUT}; use dusk_consensus::errors::{ConsensusError, HeaderError}; use dusk_consensus::user::provisioners::{ContextProvisioners, Provisioners}; @@ -26,7 +25,7 @@ use dusk_consensus::operations::Voter; use execution_core::stake::{Withdraw, STAKE_CONTRACT}; use metrics::{counter, gauge, histogram}; use node_data::message::payload::Vote; -use node_data::{get_current_timestamp, Serializable, StepName}; +use node_data::{get_current_timestamp, Serializable}; use std::collections::BTreeMap; use std::sync::{Arc, LazyLock}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -38,10 +37,7 @@ use tracing::{debug, info, warn}; use super::consensus::Task; use crate::chain::header_validation::{verify_faults, Validator}; use crate::chain::metrics::AverageElapsedTime; -use crate::database::rocksdb::{ - MD_AVG_PROPOSAL, MD_AVG_RATIFICATION, MD_AVG_VALIDATION, MD_HASH_KEY, - MD_STATE_ROOT_KEY, -}; +use crate::database::rocksdb::{MD_AVG_STEP, MD_HASH_KEY, MD_STATE_ROOT_KEY}; const CANDIDATES_DELETION_OFFSET: u64 = 10; @@ -249,7 +245,7 @@ impl Acceptor { } pub async fn spawn_task(&self) { let provisioners_list = self.provisioners_list.read().await.clone(); - let base_timeouts = self.adjust_round_base_timeouts().await; + let base_timeout = self.step_avg_timeout().await; let tip = self.tip.read().await.inner().clone(); let tip_block_voters = @@ -260,7 +256,7 @@ impl Acceptor { provisioners_list, &self.db, &self.vm, - base_timeouts, + base_timeout, tip_block_voters, ); } @@ -603,6 +599,12 @@ impl Acceptor { // A fully valid block is accepted, consensus task must be aborted. task.abort_with_wait().await; + self.add_step_avg_time( + tip.inner().header().iteration + 1, + Duration::from_secs(block_time), + ) + .await?; + Self::emit_metrics( tip.inner(), &label, @@ -696,13 +698,13 @@ impl Acceptor { // Restart Consensus. if enable_consensus { - let base_timeouts = self.adjust_round_base_timeouts().await; + let base_timeout = self.step_avg_timeout().await; task.spawn( tip.inner(), provisioners_list.clone(), &self.db, &self.vm, - base_timeouts, + base_timeout, tip_block_voters, ); } @@ -981,13 +983,13 @@ impl Acceptor { let tip_block_voters = self.get_att_voters(provisioners_list.prev(), &tip).await; - let base_timeouts = self.adjust_round_base_timeouts().await; + let base_timeout = self.step_avg_timeout().await; task.spawn( &tip, provisioners_list, &self.db, &self.vm, - base_timeouts, + base_timeout, tip_block_voters, ); } @@ -1049,30 +1051,38 @@ impl Acceptor { self.task.read().await.outbound.clone() } - async fn adjust_round_base_timeouts(&self) -> TimeoutSet { - let mut base_timeout_set = TimeoutSet::new(); + async fn add_step_avg_time( + &self, + number_of_iter: u8, + block_elapsed: Duration, + ) -> Result<()> { + let iter_avg = block_elapsed / number_of_iter as u32; + let step_avg = iter_avg / 3_u32; - base_timeout_set.insert( - StepName::Proposal, - self.read_avg_timeout(MD_AVG_PROPOSAL).await, - ); + let db = self.db.read().await; + db.update(|t| { + let mut metric = match &t.op_read(MD_AVG_STEP)? { + Some(bytes) => AverageElapsedTime::read(&mut &bytes[..]) + .unwrap_or_default(), + None => AverageElapsedTime::default(), + }; - base_timeout_set.insert( - StepName::Validation, - self.read_avg_timeout(MD_AVG_VALIDATION).await, - ); + metric.push_back(step_avg); + debug!(event = "avg_updated", metric = ?metric); - base_timeout_set.insert( - StepName::Ratification, - self.read_avg_timeout(MD_AVG_RATIFICATION).await, - ); + let mut bytes = Vec::new(); + metric.write(&mut bytes)?; - base_timeout_set + t.op_write(MD_AVG_STEP, bytes) + }) + .map_err(|e| anyhow::anyhow!("Cannot update step_avg {e}"))?; + + Ok(()) } - async fn read_avg_timeout(&self, key: &[u8]) -> Duration { + async fn step_avg_timeout(&self) -> Duration { let metric = self.db.read().await.view(|t| { - let bytes = &t.op_read(key)?; + let bytes = &t.op_read(MD_AVG_STEP)?; let metric = match bytes { Some(bytes) => AverageElapsedTime::read(&mut &bytes[..]) .unwrap_or_default(), @@ -1086,12 +1096,12 @@ impl Acceptor { Ok::(metric) }); - metric + let stored_avg = metric .unwrap_or_default() .average() - .unwrap_or(MIN_STEP_TIMEOUT) - .max(MIN_STEP_TIMEOUT) - .min(MAX_STEP_TIMEOUT) + .unwrap_or(MAX_STEP_TIMEOUT); + + stored_avg.max(MIN_STEP_TIMEOUT).min(MAX_STEP_TIMEOUT) } async fn get_prev_block_seed(&self) -> Result { diff --git a/node/src/chain/consensus.rs b/node/src/chain/consensus.rs index 1adc079140..68e8aefb56 100644 --- a/node/src/chain/consensus.rs +++ b/node/src/chain/consensus.rs @@ -7,7 +7,7 @@ use crate::database::{self, Candidate, Mempool, Metadata}; use crate::{vm, Message}; use async_trait::async_trait; -use dusk_consensus::commons::{RoundUpdate, TimeoutSet}; +use dusk_consensus::commons::RoundUpdate; use dusk_consensus::consensus::Consensus; use dusk_consensus::errors::{ConsensusError, HeaderError, OperationError}; use dusk_consensus::operations::{ @@ -24,12 +24,9 @@ use tokio::task::JoinHandle; use tracing::{debug, info, trace, warn}; use crate::chain::header_validation::Validator; -use crate::chain::metrics::AverageElapsedTime; -use crate::database::rocksdb::{ - MD_AVG_PROPOSAL, MD_AVG_RATIFICATION, MD_AVG_VALIDATION, MD_LAST_ITER, -}; +use crate::database::rocksdb::MD_LAST_ITER; use metrics::gauge; -use node_data::{ledger, Serializable, StepName}; +use node_data::ledger; use std::sync::Arc; use std::time::Duration; @@ -99,7 +96,7 @@ impl Task { provisioners_list: ContextProvisioners, db: &Arc>, vm: &Arc>, - base_timeout: TimeoutSet, + base_timeout: Duration, voters: Vec, ) { let current = provisioners_list.to_current(); @@ -120,7 +117,7 @@ impl Task { self.keys.1.clone(), self.keys.0.clone(), tip.header(), - base_timeout.clone(), + base_timeout, voters, ); @@ -354,40 +351,6 @@ impl Operations for Executor { }) } - async fn add_step_elapsed_time( - &self, - _round: u64, - step_name: StepName, - elapsed: Duration, - ) -> Result<(), OperationError> { - let db_key = match step_name { - StepName::Proposal => MD_AVG_PROPOSAL, - StepName::Validation => MD_AVG_VALIDATION, - StepName::Ratification => MD_AVG_RATIFICATION, - }; - - let db = self.db.read().await; - let _ = db - .update(|t| { - let mut metric = match &t.op_read(db_key)? { - Some(bytes) => AverageElapsedTime::read(&mut &bytes[..]) - .unwrap_or_default(), - None => AverageElapsedTime::default(), - }; - - metric.push_back(elapsed); - debug!(event = "avg_updated", ?step_name, metric = ?metric); - - let mut bytes = Vec::new(); - metric.write(&mut bytes)?; - - t.op_write(db_key, bytes) - }) - .map_err(OperationError::MetricsUpdate)?; - - Ok(()) - } - async fn get_block_gas_limit(&self) -> u64 { self.vm.read().await.get_block_gas_limit() } diff --git a/node/src/database/rocksdb.rs b/node/src/database/rocksdb.rs index ea2d665c6c..a4a62c2135 100644 --- a/node/src/database/rocksdb.rs +++ b/node/src/database/rocksdb.rs @@ -50,9 +50,7 @@ const DB_FOLDER_NAME: &str = "chain.db"; // List of supported metadata keys pub const MD_HASH_KEY: &[u8] = b"hash_key"; pub const MD_STATE_ROOT_KEY: &[u8] = b"state_hash_key"; -pub const MD_AVG_VALIDATION: &[u8] = b"avg_validation_time"; -pub const MD_AVG_RATIFICATION: &[u8] = b"avg_ratification_time"; -pub const MD_AVG_PROPOSAL: &[u8] = b"avg_proposal_time"; +pub const MD_AVG_STEP: &[u8] = b"avg_step_time"; pub const MD_LAST_ITER: &[u8] = b"consensus_last_iter"; #[derive(Clone)]