Skip to content

Commit

Permalink
update qs key loading (#15401)
Browse files Browse the repository at this point in the history
* qs key update

* lint

* expect consensus key to exist

* lint
  • Loading branch information
zjma authored Nov 28, 2024
1 parent c88fc0d commit bc4364b
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 48 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ aptos-executor = { workspace = true }
aptos-executor-types = { workspace = true }
aptos-experimental-runtimes = { workspace = true }
aptos-fallible = { workspace = true }
aptos-global-constants = { workspace = true }
aptos-infallible = { workspace = true }
aptos-logger = { workspace = true }
aptos-mempool = { workspace = true }
Expand Down
13 changes: 9 additions & 4 deletions consensus/safety-rules/src/persistent_safety_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ impl PersistentSafetyStorage {
Ok(self.internal_store.get(OWNER_ACCOUNT).map(|v| v.value)?)
}

pub fn default_consensus_sk(
&self,
) -> Result<bls12381::PrivateKey, aptos_secure_storage::Error> {
self.internal_store
.get::<bls12381::PrivateKey>(CONSENSUS_KEY)
.map(|v| v.value)
}

pub fn consensus_sk_by_pk(
&self,
pk: bls12381::PublicKey,
Expand All @@ -107,10 +115,7 @@ impl PersistentSafetyStorage {
.internal_store
.get::<bls12381::PrivateKey>(explicit_storage_key.as_str())
.map(|v| v.value);
let default_sk = self
.internal_store
.get::<bls12381::PrivateKey>(CONSENSUS_KEY)
.map(|v| v.value);
let default_sk = self.default_consensus_sk();
let key = match (explicit_sk, default_sk) {
(Ok(sk_0), _) => sk_0,
(Err(_), Ok(sk_1)) => sk_1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ impl ConsensusObserver {
aptos_channel::new::<AccountAddress, IncomingRandGenRequest>(QueueStyle::FIFO, 1, None);
self.execution_client
.start_epoch(
Some(sk),
sk,
epoch_state.clone(),
dummy_signer.clone(),
payload_manager,
Expand Down
56 changes: 34 additions & 22 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
epoch_state: &EpochState,
network_sender: NetworkSender,
consensus_config: &OnChainConsensusConfig,
consensus_key: Arc<PrivateKey>,
) -> (
Arc<dyn TPayloadManager>,
QuorumStoreClient,
Expand Down Expand Up @@ -701,6 +702,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
self.config.safety_rules.backend.clone(),
self.quorum_store_storage.clone(),
!consensus_config.is_dag_enabled(),
consensus_key,
))
} else {
info!("Building DirectMempool");
Expand Down Expand Up @@ -748,7 +750,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {

async fn start_round_manager(
&mut self,
consensus_key: Option<Arc<PrivateKey>>,
consensus_key: Arc<PrivateKey>,
recovery_data: RecoveryData,
epoch_state: Arc<EpochState>,
onchain_consensus_config: OnChainConsensusConfig,
Expand Down Expand Up @@ -823,8 +825,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
recovery_data.root_block().round(),
)
.await;
let consensus_sk =
consensus_key.expect("consensus key unavailable for ExecutionProxyClient");
let consensus_sk = consensus_key;

let maybe_pipeline_builder = if self.config.enable_pipeline {
let signer = Arc::new(ValidatorSigner::new(self.author, consensus_sk));
Expand Down Expand Up @@ -952,7 +953,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {

fn try_get_rand_config_for_new_epoch(
&self,
maybe_consensus_key: Option<Arc<PrivateKey>>,
consensus_key: Arc<PrivateKey>,
new_epoch_state: &EpochState,
onchain_randomness_config: &OnChainRandomnessConfig,
maybe_dkg_state: anyhow::Result<DKGState>,
Expand Down Expand Up @@ -981,8 +982,6 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
.copied()
.ok_or_else(|| NoRandomnessReason::NotInValidatorSet)?;

let consensus_key =
maybe_consensus_key.ok_or(NoRandomnessReason::ConsensusKeyUnavailable)?;
let dkg_decrypt_key = maybe_dk_from_bls_sk(consensus_key.as_ref())
.map_err(NoRandomnessReason::ErrConvertingConsensusKeyToDecryptionKey)?;
let transcript = bcs::from_bytes::<<DefaultDKG as DKGTrait>::Transcript>(
Expand Down Expand Up @@ -1158,10 +1157,9 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
});

let loaded_consensus_key = match self.load_consensus_key(&epoch_state.verifier) {
Ok(k) => Some(Arc::new(k)),
Ok(k) => Arc::new(k),
Err(e) => {
warn!("load_consensus_key failed: {e}");
None
panic!("load_consensus_key failed: {e}");
},
};

Expand Down Expand Up @@ -1199,7 +1197,11 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
);

let (network_sender, payload_client, payload_manager) = self
.initialize_shared_component(&epoch_state, &consensus_config)
.initialize_shared_component(
&epoch_state,
&consensus_config,
loaded_consensus_key.clone(),
)
.await;

let (rand_msg_tx, rand_msg_rx) = aptos_channel::new::<AccountAddress, IncomingRandGenRequest>(
Expand Down Expand Up @@ -1249,6 +1251,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
&mut self,
epoch_state: &EpochState,
consensus_config: &OnChainConsensusConfig,
consensus_key: Arc<PrivateKey>,
) -> (
NetworkSender,
Arc<dyn PayloadClient>,
Expand All @@ -1258,7 +1261,12 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
self.quorum_store_enabled = self.enable_quorum_store(consensus_config);
let network_sender = self.create_network_sender(epoch_state);
let (payload_manager, quorum_store_client, quorum_store_builder) = self
.init_payload_provider(epoch_state, network_sender.clone(), consensus_config)
.init_payload_provider(
epoch_state,
network_sender.clone(),
consensus_config,
consensus_key,
)
.await;
let effective_vtxn_config = consensus_config.effective_validator_txn_config();
debug!("effective_vtxn_config={:?}", effective_vtxn_config);
Expand All @@ -1277,7 +1285,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {

async fn start_new_epoch_with_joltean(
&mut self,
consensus_key: Option<Arc<PrivateKey>>,
consensus_key: Arc<PrivateKey>,
epoch_state: Arc<EpochState>,
consensus_config: OnChainConsensusConfig,
execution_config: OnChainExecutionConfig,
Expand Down Expand Up @@ -1326,7 +1334,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
async fn start_new_epoch_with_dag(
&mut self,
epoch_state: Arc<EpochState>,
loaded_consensus_key: Option<Arc<PrivateKey>>,
loaded_consensus_key: Arc<PrivateKey>,
onchain_consensus_config: OnChainConsensusConfig,
on_chain_execution_config: OnChainExecutionConfig,
onchain_randomness_config: OnChainRandomnessConfig,
Expand All @@ -1341,9 +1349,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
let epoch = epoch_state.epoch;
let signer = Arc::new(ValidatorSigner::new(
self.author,
loaded_consensus_key
.clone()
.expect("unable to get private key"),
loaded_consensus_key.clone(),
));
let commit_signer = Arc::new(DagCommitSigner::new(signer.clone()));

Expand Down Expand Up @@ -1796,12 +1802,18 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
}

fn load_consensus_key(&self, vv: &ValidatorVerifier) -> anyhow::Result<PrivateKey> {
let pk = vv
.get_public_key(&self.author)
.ok_or_else(|| anyhow!("i am not in the validator set!"))?;
self.key_storage
.consensus_sk_by_pk(pk)
.map_err(|e| anyhow!("could not find sk by pk: {:?}", e))
match vv.get_public_key(&self.author) {
Some(pk) => self
.key_storage
.consensus_sk_by_pk(pk)
.map_err(|e| anyhow!("could not find sk by pk: {:?}", e)),
None => {
warn!("could not find my pk in validator set, loading default sk!");
self.key_storage
.default_consensus_sk()
.map_err(|e| anyhow!("could not load default sk: {e}"))
},
}
}
}

Expand Down
10 changes: 4 additions & 6 deletions consensus/src/pipeline/execution_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub trait TExecutionClient: Send + Sync {
/// Initialize the execution phase for a new epoch.
async fn start_epoch(
&self,
maybe_consensus_key: Option<Arc<PrivateKey>>,
maybe_consensus_key: Arc<PrivateKey>,
epoch_state: Arc<EpochState>,
commit_signer_provider: Arc<dyn CommitSignerProvider>,
payload_manager: Arc<dyn TPayloadManager>,
Expand Down Expand Up @@ -196,7 +196,7 @@ impl ExecutionProxyClient {

fn spawn_decoupled_execution(
&self,
maybe_consensus_key: Option<Arc<PrivateKey>>,
consensus_sk: Arc<PrivateKey>,
commit_signer_provider: Arc<dyn CommitSignerProvider>,
epoch_state: Arc<EpochState>,
rand_config: Option<RandConfig>,
Expand Down Expand Up @@ -230,8 +230,6 @@ impl ExecutionProxyClient {
let (rand_ready_block_tx, rand_ready_block_rx) = unbounded::<OrderedBlocks>();

let (reset_tx_to_rand_manager, reset_rand_manager_rx) = unbounded::<ResetRequest>();
let consensus_sk = maybe_consensus_key
.expect("consensus key unavailable for ExecutionProxyClient");
let signer = Arc::new(ValidatorSigner::new(self.author, consensus_sk));

let rand_manager = RandManager::<Share, AugmentedData>::new(
Expand Down Expand Up @@ -310,7 +308,7 @@ impl ExecutionProxyClient {
impl TExecutionClient for ExecutionProxyClient {
async fn start_epoch(
&self,
maybe_consensus_key: Option<Arc<PrivateKey>>,
maybe_consensus_key: Arc<PrivateKey>,
epoch_state: Arc<EpochState>,
commit_signer_provider: Arc<dyn CommitSignerProvider>,
payload_manager: Arc<dyn TPayloadManager>,
Expand Down Expand Up @@ -526,7 +524,7 @@ pub struct DummyExecutionClient;
impl TExecutionClient for DummyExecutionClient {
async fn start_epoch(
&self,
_maybe_consensus_key: Option<Arc<PrivateKey>>,
_maybe_consensus_key: Arc<PrivateKey>,
_epoch_state: Arc<EpochState>,
_commit_signer_provider: Arc<dyn CommitSignerProvider>,
_payload_manager: Arc<dyn TPayloadManager>,
Expand Down
18 changes: 6 additions & 12 deletions consensus/src/quorum_store/quorum_store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ use aptos_config::config::{QuorumStoreConfig, SecureBackend};
use aptos_consensus_types::{
common::Author, proof_of_store::ProofCache, request_response::GetPayloadCommand,
};
use aptos_global_constants::CONSENSUS_KEY;
use aptos_crypto::bls12381::PrivateKey;
use aptos_logger::prelude::*;
use aptos_mempool::QuorumStoreRequest;
use aptos_secure_storage::{KVStorage, Storage};
use aptos_storage_interface::DbReader;
use aptos_types::{
account_address::AccountAddress, validator_signer::ValidatorSigner,
Expand Down Expand Up @@ -148,9 +147,11 @@ pub struct InnerBuilder {
batch_store: Option<Arc<BatchStore>>,
batch_reader: Option<Arc<dyn BatchReader>>,
broadcast_proofs: bool,
consensus_key: Arc<PrivateKey>,
}

impl InnerBuilder {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
epoch: u64,
author: Author,
Expand All @@ -166,6 +167,7 @@ impl InnerBuilder {
backend: SecureBackend,
quorum_store_storage: Arc<dyn QuorumStoreStorage>,
broadcast_proofs: bool,
consensus_key: Arc<PrivateKey>,
) -> Self {
let (coordinator_tx, coordinator_rx) = futures_channel::mpsc::channel(config.channel_size);
let (batch_generator_cmd_tx, batch_generator_cmd_rx) =
Expand Down Expand Up @@ -221,20 +223,12 @@ impl InnerBuilder {
batch_store: None,
batch_reader: None,
broadcast_proofs,
consensus_key,
}
}

fn create_batch_store(&mut self) -> Arc<BatchReaderImpl<NetworkSender>> {
let backend = &self.backend;
let storage: Storage = backend.into();
if let Err(error) = storage.available() {
panic!("Storage is not available: {:?}", error);
}
let private_key = storage
.get(CONSENSUS_KEY)
.map(|v| v.value)
.expect("Unable to get private key");
let signer = ValidatorSigner::new(self.author, private_key);
let signer = ValidatorSigner::new(self.author, self.consensus_key.clone());

let latest_ledger_info_with_sigs = self
.aptos_db
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/test_utils/mock_execution_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl MockExecutionClient {
impl TExecutionClient for MockExecutionClient {
async fn start_epoch(
&self,
_maybe_consensus_key: Option<Arc<PrivateKey>>,
_maybe_consensus_key: Arc<PrivateKey>,
_epoch_state: Arc<EpochState>,
_commit_signer_provider: Arc<dyn CommitSignerProvider>,
_payload_manager: Arc<dyn TPayloadManager>,
Expand Down

0 comments on commit bc4364b

Please sign in to comment.