Skip to content

Commit

Permalink
Crate compiles without tests. Missing tests and metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
brunoffranca committed Oct 26, 2024
1 parent 8529cdf commit 83df8bf
Show file tree
Hide file tree
Showing 8 changed files with 1,184 additions and 1,236 deletions.
8 changes: 4 additions & 4 deletions node/actors/bft/src/chonky_bft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use zksync_concurrency::{
use zksync_consensus_network::io::ConsensusReq;
use zksync_consensus_roles::validator::{self, ConsensusMsg};

mod commit;
pub(crate) mod commit;
mod misc;
mod new_view;
mod proposal;
pub(crate) mod new_view;
pub(crate) mod proposal;
pub(crate) mod proposer;
mod timeout;
pub(crate) mod timeout;

#[cfg(test)]
mod tests;
Expand Down
107 changes: 56 additions & 51 deletions node/actors/bft/src/chonky_bft/proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ use zksync_concurrency::{ctx, error::Wrap as _, sync};
use zksync_consensus_network::io::ConsensusInputMessage;
use zksync_consensus_roles::validator;

/// In a loop, receives a PrepareQC and sends a LeaderPrepare containing it.
/// Every subsequent PrepareQC has to be for a higher view than the previous one (otherwise it
/// is skipped). In case payload generation takes too long, some PrepareQC may be elided, so
/// that the validator doesn't spend time on generating payloads for already expired views.
/// The proposer loop is responsible for proposing new blocks to the network. It watches for new
/// justifications from the replica and if it is the leader for the view, it proposes a new block.
pub(crate) async fn run_proposer(
ctx: &ctx::Ctx,
cfg: Arc<Config>,
Expand All @@ -20,63 +18,70 @@ pub(crate) async fn run_proposer(
continue;
};

let genesis = cfg.genesis();

// If we are not the leader for this view, skip it.
if genesis.view_leader(justification.view().number) != cfg.secret_key.public() {
if cfg.genesis().view_leader(justification.view().number) != cfg.secret_key.public() {
continue;
}

// Get the block number and check if this must be a reproposal.
let (block_number, opt_block_hash) = justification.get_implied_block(genesis);
let proposal = create_proposal(ctx, cfg.clone(), justification).await?;

let proposal_payload = match opt_block_hash {
// There was some proposal last view that a subquorum of replicas
// voted for and could have been finalized. We need to repropose it.
Some(_) => None,
// The previous proposal was finalized, so we can propose a new block.
None => {
// Defensively assume that PayloadManager cannot propose until the previous block is stored.
// if we don't have the previous block, this call will halt until the other replicas timeout.
// This is fine as we can just not propose anything and let our turn end. Eventually, some other
// replica will produce some block with this block number and this function will unblock.
if let Some(prev) = block_number.prev() {
cfg.block_store.wait_until_persisted(ctx, prev).await?;
}
// Broadcast our proposal to all replicas (ourselves included).
let msg = cfg
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderProposal(proposal));

let payload = cfg
.payload_manager
.propose(ctx, block_number)
.await
.wrap("payload_manager.propose()")?;
pipe.send(ConsensusInputMessage { message: msg }.into());
}
}

if payload.0.len() > cfg.max_payload_size {
return Err(anyhow::format_err!(
"proposed payload too large: got {}B, max {}B",
payload.0.len(),
cfg.max_payload_size
)
.into());
}
/// Creates a proposal for the given justification.
pub(crate) async fn create_proposal(
ctx: &ctx::Ctx,
cfg: Arc<Config>,
justification: validator::ProposalJustification,
) -> ctx::Result<validator::LeaderProposal> {
// Get the block number and check if this must be a reproposal.
let (block_number, opt_block_hash) = justification.get_implied_block(cfg.genesis());

metrics::METRICS
.leader_proposal_payload_size
.observe(payload.0.len());
let proposal_payload = match opt_block_hash {
// There was some proposal last view that a subquorum of replicas
// voted for and could have been finalized. We need to repropose it.
Some(_) => None,
// The previous proposal was finalized, so we can propose a new block.
None => {
// Defensively assume that PayloadManager cannot propose until the previous block is stored.
// if we don't have the previous block, this call will halt until the other replicas timeout.
// This is fine as we can just not propose anything and let our turn end. Eventually, some other
// replica will produce some block with this block number and this function will unblock.
if let Some(prev) = block_number.prev() {
cfg.block_store.wait_until_persisted(ctx, prev).await?;
}

Some(payload)
let payload = cfg
.payload_manager
.propose(ctx, block_number)
.await
.wrap("payload_manager.propose()")?;

if payload.0.len() > cfg.max_payload_size {
return Err(anyhow::format_err!(
"proposed payload too large: got {}B, max {}B",
payload.0.len(),
cfg.max_payload_size
)
.into());
}
};

// Broadcast our proposal to all replicas (ourselves included).
let msg = cfg
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderProposal(
validator::LeaderProposal {
proposal_payload,
justification,
},
));
metrics::METRICS
.leader_proposal_payload_size
.observe(payload.0.len());

pipe.send(ConsensusInputMessage { message: msg }.into());
}
Some(payload)
}
};

Ok(validator::LeaderProposal {
proposal_payload,
justification,
})
}
Loading

0 comments on commit 83df8bf

Please sign in to comment.