From 3ae05a92b5393e1ec94a0d558b8a5ecf16b225f8 Mon Sep 17 00:00:00 2001 From: Rodrigo Quelhas Date: Sun, 1 Dec 2024 15:36:26 +0000 Subject: [PATCH 1/5] Increase authoring deadline to 60 seconds when using lazy loading --- .../src/lazy_loading/manual_sealing.rs | 213 ++++++++++++++++++ node/service/src/lazy_loading/mod.rs | 5 +- 2 files changed, 216 insertions(+), 2 deletions(-) create mode 100644 node/service/src/lazy_loading/manual_sealing.rs diff --git a/node/service/src/lazy_loading/manual_sealing.rs b/node/service/src/lazy_loading/manual_sealing.rs new file mode 100644 index 0000000000..e99218fff1 --- /dev/null +++ b/node/service/src/lazy_loading/manual_sealing.rs @@ -0,0 +1,213 @@ +// Copyright 2024 Moonbeam foundation +// This file is part of Moonbeam. + +// Moonbeam is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonbeam is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonbeam. If not, see . + +use cumulus_primitives_core::BlockT; +use frame_benchmarking::__private::codec; +use futures::{Stream, StreamExt, TryFutureExt}; +use sc_client_api::backend::Backend as ClientBackend; +use sc_client_api::Finalizer; +use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult, StateAction}; +use sc_consensus_manual_seal::{ + finalize_block, rpc, CreatedBlock, EngineCommand, Error, FinalizeBlockParams, ManualSealParams, + SealBlockParams, MANUAL_SEAL_ENGINE_ID, +}; +use sc_transaction_pool_api::TransactionPool; +use sp_api::ProvideRuntimeApi; +use sp_blockchain::HeaderBackend; +use sp_consensus::{BlockOrigin, Environment, Proposer, SelectChain}; +use sp_inherents::{CreateInherentDataProviders, InherentDataProvider}; +use sp_runtime::traits::Header; +use std::marker::PhantomData; +use std::time::Duration; + +pub async fn run_manual_seal( + ManualSealParams { + mut block_import, + mut env, + client, + pool, + mut commands_stream, + select_chain, + consensus_data_provider, + create_inherent_data_providers, + }: ManualSealParams, +) where + B: BlockT + 'static, + BI: BlockImport + Send + Sync + 'static, + C: HeaderBackend + Finalizer + ProvideRuntimeApi + 'static, + CB: ClientBackend + 'static, + E: Environment + 'static, + E::Proposer: Proposer, + CS: Stream::Hash>> + Unpin + 'static, + SC: SelectChain + 'static, + TP: TransactionPool, + CIDP: CreateInherentDataProviders, + P: codec::Encode + Send + Sync + 'static, +{ + while let Some(command) = commands_stream.next().await { + match command { + EngineCommand::SealNewBlock { + create_empty, + finalize, + parent_hash, + sender, + } => { + seal_block(SealBlockParams { + sender, + parent_hash, + finalize, + create_empty, + env: &mut env, + select_chain: &select_chain, + block_import: &mut block_import, + consensus_data_provider: consensus_data_provider.as_deref(), + pool: pool.clone(), + client: client.clone(), + create_inherent_data_providers: &create_inherent_data_providers, + }) + .await; + } + EngineCommand::FinalizeBlock { + hash, + sender, + justification, + } => { + let justification = justification.map(|j| (MANUAL_SEAL_ENGINE_ID, j)); + finalize_block(FinalizeBlockParams { + hash, + sender, + justification, + finalizer: client.clone(), + _phantom: PhantomData, + }) + .await + } + } + } +} + +/// max duration for creating a proposal in secs +pub const MAX_PROPOSAL_DURATION: u64 = 60; + +/// seals a new block with the given params +pub async fn seal_block( + SealBlockParams { + create_empty, + finalize, + pool, + parent_hash, + client, + select_chain, + block_import, + env, + create_inherent_data_providers, + consensus_data_provider: digest_provider, + mut sender, + }: SealBlockParams<'_, B, BI, SC, C, E, TP, CIDP, P>, +) where + B: BlockT, + BI: BlockImport + Send + Sync + 'static, + C: HeaderBackend + ProvideRuntimeApi, + E: Environment, + E::Proposer: Proposer, + TP: TransactionPool, + SC: SelectChain, + CIDP: CreateInherentDataProviders, + P: codec::Encode + Send + Sync + 'static, +{ + let future = async { + if pool.status().ready == 0 && !create_empty { + return Err(Error::EmptyTransactionPool); + } + + // get the header to build this new block on. + // use the parent_hash supplied via `EngineCommand` + // or fetch the best_block. + let parent = match parent_hash { + Some(hash) => client + .header(hash)? + .ok_or_else(|| Error::BlockNotFound(format!("{}", hash)))?, + None => select_chain.best_chain().await?, + }; + + let inherent_data_providers = create_inherent_data_providers + .create_inherent_data_providers(parent.hash(), ()) + .await + .map_err(|e| Error::Other(e))?; + + let inherent_data = inherent_data_providers.create_inherent_data().await?; + + let proposer = env + .init(&parent) + .map_err(|err| Error::StringError(err.to_string())) + .await?; + let inherents_len = inherent_data.len(); + + let digest = if let Some(digest_provider) = digest_provider { + digest_provider.create_digest(&parent, &inherent_data)? + } else { + Default::default() + }; + + let proposal = proposer + .propose( + inherent_data.clone(), + digest, + Duration::from_secs(MAX_PROPOSAL_DURATION), + None, + ) + .map_err(|err| Error::StringError(err.to_string())) + .await?; + + if proposal.block.extrinsics().len() == inherents_len && !create_empty { + return Err(Error::EmptyTransactionPool); + } + + let (header, body) = proposal.block.deconstruct(); + let proof = proposal.proof; + let proof_size = proof.encoded_size(); + let mut params = BlockImportParams::new(BlockOrigin::Own, header.clone()); + params.body = Some(body); + params.finalized = finalize; + params.fork_choice = Some(ForkChoiceStrategy::LongestChain); + params.state_action = StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes( + proposal.storage_changes, + )); + + if let Some(digest_provider) = digest_provider { + digest_provider.append_block_import(&parent, &mut params, &inherent_data, proof)?; + } + + // Make sure we return the same post-hash that will be calculated when importing the block + // This is important in case the digest_provider added any signature, seal, ect. + let mut post_header = header.clone(); + post_header + .digest_mut() + .logs + .extend(params.post_digests.iter().cloned()); + + match block_import.import_block(params).await? { + ImportResult::Imported(aux) => Ok(CreatedBlock { + hash: ::Header::hash(&post_header), + aux, + proof_size, + }), + other => Err(other.into()), + } + }; + + rpc::send_result(&mut sender, future.await) +} diff --git a/node/service/src/lazy_loading/mod.rs b/node/service/src/lazy_loading/mod.rs index a581619edf..3c5cda8978 100644 --- a/node/service/src/lazy_loading/mod.rs +++ b/node/service/src/lazy_loading/mod.rs @@ -62,6 +62,7 @@ pub mod call_executor; mod client; mod helpers; mod lock; +mod manual_sealing; mod state_overrides; pub const LAZY_LOADING_LOG_TARGET: &'static str = "lazy-loading"; @@ -379,7 +380,7 @@ where { use async_io::Timer; use futures::Stream; - use sc_consensus_manual_seal::{run_manual_seal, EngineCommand, ManualSealParams}; + use sc_consensus_manual_seal::{EngineCommand, ManualSealParams}; let sc_service::PartialComponents { client, @@ -572,7 +573,7 @@ where task_manager.spawn_essential_handle().spawn_blocking( "authorship_task", Some("block-authoring"), - run_manual_seal(ManualSealParams { + manual_sealing::run_manual_seal(ManualSealParams { block_import, env, client: client.clone(), From e84ad7fa8e2a0470dd057cadaaa87a3c5e5363f1 Mon Sep 17 00:00:00 2001 From: crystalin Date: Sun, 1 Dec 2024 21:17:16 +0100 Subject: [PATCH 2/5] Change lazy loading counter to be thread-safe --- node/service/src/lazy_loading/backend.rs | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/node/service/src/lazy_loading/backend.rs b/node/service/src/lazy_loading/backend.rs index 7aadd9dc42..2aae0988d4 100644 --- a/node/service/src/lazy_loading/backend.rs +++ b/node/service/src/lazy_loading/backend.rs @@ -31,7 +31,10 @@ use std::time::Duration; use std::{ collections::{HashMap, HashSet}, ptr, - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, }; use sc_client_api::{ @@ -1457,7 +1460,7 @@ pub struct RPC { http_client: HttpClient, delay_between_requests_ms: u64, max_retries_per_request: usize, - counter: Arc>, + counter: Arc, } impl RPC { @@ -1652,16 +1655,18 @@ impl RPC { { use tokio::runtime::Handle; + let id = self.counter.fetch_add(1, Ordering::SeqCst); + let start = std::time::Instant::now(); + tokio::task::block_in_place(move || { Handle::current().block_on(async move { let delay_between_requests = Duration::from_millis(self.delay_between_requests_ms); - let start = std::time::Instant::now(); - self.counter.write().add_assign(1); + let start_req = std::time::Instant::now(); log::debug!( target: super::LAZY_LOADING_LOG_TARGET, "Sending request: {}", - self.counter.read() + id ); // Explicit request delay, to avoid getting 429 errors @@ -1675,10 +1680,11 @@ impl RPC { log::debug!( target: super::LAZY_LOADING_LOG_TARGET, - "Completed request (id: {}, successful: {}, elapsed_time: {:?})", - self.counter.read(), + "Completed request (id: {}, successful: {}, elapsed_time: {:?}, query_time: {:?})", + id, result.is_ok(), - start.elapsed() + start.elapsed(), + start_req.elapsed() ); result From bb87e9e5bbae1ed71000c733aabe58eb81d84a0a Mon Sep 17 00:00:00 2001 From: crystalin Date: Sun, 1 Dec 2024 21:32:43 +0100 Subject: [PATCH 3/5] remove extras --- node/service/src/lazy_loading/backend.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/node/service/src/lazy_loading/backend.rs b/node/service/src/lazy_loading/backend.rs index 2aae0988d4..b644581ed4 100644 --- a/node/service/src/lazy_loading/backend.rs +++ b/node/service/src/lazy_loading/backend.rs @@ -26,7 +26,6 @@ use sp_state_machine::{ }; use std::future::Future; use std::marker::PhantomData; -use std::ops::AddAssign; use std::time::Duration; use std::{ collections::{HashMap, HashSet}, From 25eb188406ade4c1df7774696a53fd482df3754c Mon Sep 17 00:00:00 2001 From: Rodrigo Quelhas Date: Tue, 10 Dec 2024 15:37:55 +0000 Subject: [PATCH 4/5] chore: fiz block finalization when using lazy loading --- node/service/src/lazy_loading/backend.rs | 26 +++++++-------- node/service/src/lazy_loading/helpers.rs | 32 +------------------ .../lazy-loading/test-runtime-upgrade.ts | 9 +++--- 3 files changed, 16 insertions(+), 51 deletions(-) diff --git a/node/service/src/lazy_loading/backend.rs b/node/service/src/lazy_loading/backend.rs index d747602f50..623498aec6 100644 --- a/node/service/src/lazy_loading/backend.rs +++ b/node/service/src/lazy_loading/backend.rs @@ -197,8 +197,10 @@ impl Blockchain { self.apply_head(&header)?; } - { - let mut storage = self.storage.write(); + let mut storage = self.storage.write(); + if number.is_zero() { + storage.genesis_hash = hash; + } else { storage.leaves.import(hash, number, *header.parent_hash()); storage .blocks @@ -208,10 +210,6 @@ impl Blockchain { storage.finalized_hash = hash; storage.finalized_number = number; } - - if number == Zero::zero() { - storage.genesis_hash = hash; - } } Ok(()) @@ -501,7 +499,9 @@ impl blockchain::Backend for Blockchain } fn leaves(&self) -> sp_blockchain::Result> { - Ok(self.storage.read().leaves.hashes()) + let leaves = self.storage.read().leaves.hashes(); + + Ok(leaves) } fn children(&self, _parent_hash: Block::Hash) -> sp_blockchain::Result> { @@ -951,7 +951,7 @@ impl ForkedLazyBackend { let mut entries: HashMap, StorageCollection> = Default::default(); entries.insert(None, vec![(key.to_vec(), Some(val.clone()))]); - self.db.write().insert(entries, StateVersion::V0); + self.db.write().insert(entries, StateVersion::V1); } } } @@ -1257,10 +1257,8 @@ impl backend::Backend for Backend sp_blockchain::Result<()> { - if !operation.finalized_blocks.is_empty() { - for (block, justification) in operation.finalized_blocks { - self.blockchain.finalize_header(block, justification)?; - } + for (block, justification) in operation.finalized_blocks { + self.blockchain.finalize_header(block, justification)?; } if let Some(pending_block) = operation.pending_block { @@ -1280,7 +1278,7 @@ impl backend::Backend for Backend, operation.storage_updates)], - StateVersion::V0, + StateVersion::V1, ); let new_state = ForkedLazyBackend { rpc_client: self.rpc_client.clone(), @@ -1769,8 +1767,6 @@ where }) .collect(); - let _ = helpers::produce_genesis_block(backend.clone()); - // Produce first block after the fork let _ = helpers::produce_first_block(backend.clone(), checkpoint, state_overrides)?; diff --git a/node/service/src/lazy_loading/helpers.rs b/node/service/src/lazy_loading/helpers.rs index 6492b9850a..dad50bc5ff 100644 --- a/node/service/src/lazy_loading/helpers.rs +++ b/node/service/src/lazy_loading/helpers.rs @@ -25,36 +25,6 @@ use sp_runtime::Saturating; use sp_storage::{StateVersion, Storage, StorageKey}; use std::sync::Arc; -pub fn produce_genesis_block( - backend: Arc>, -) -> sp_blockchain::Result<()> { - let mut op = backend.begin_operation()?; - op.before_fork = true; - - let genesis_block_hash: TBl::Hash = backend - .rpc_client - .block_hash::(Some(Default::default())) - .unwrap() - .expect("Not able to obtain genesis block hash"); - - let genesis_block = backend - .rpc_client - .block::(Some(genesis_block_hash)) - .unwrap() - .unwrap() - .block; - - let _ = op.set_block_data( - genesis_block.header().clone(), - Some(genesis_block.extrinsics().to_vec()), - None, - None, - NewBlockState::Final, - ); - - backend.commit_operation(op) -} - pub fn produce_first_block( backend: Arc>, fork_checkpoint: Block, @@ -89,7 +59,7 @@ pub fn produce_first_block( top: state_overrides.into_iter().collect(), children_default: Default::default(), }, - StateVersion::V0, + StateVersion::V1, )?; // Create empty first block diff --git a/test/suites/lazy-loading/test-runtime-upgrade.ts b/test/suites/lazy-loading/test-runtime-upgrade.ts index e8b8e17af5..70cdbeb7d7 100644 --- a/test/suites/lazy-loading/test-runtime-upgrade.ts +++ b/test/suites/lazy-loading/test-runtime-upgrade.ts @@ -38,10 +38,9 @@ describeSuite({ log("Current runtime:", rtBefore); log("About to upgrade to runtime at:", wasmPath); - await context.createBlock([], { finalize: false }); + await context.createBlock(); const { result } = await context.createBlock( - api.tx.system.applyAuthorizedUpgrade(runtimeWasmHex), - { finalize: false } + api.tx.system.applyAuthorizedUpgrade(runtimeWasmHex) ); assert(result, "Block has no extrinsic results"); @@ -75,9 +74,9 @@ describeSuite({ } // This next block will receive the GoAhead signal - await context.createBlock([], { finalize: false }); + await context.createBlock(); // The next block will process the runtime upgrade - await context.createBlock([], { finalize: false }); + await context.createBlock(); const events = (await api.query.system.events()).filter(({ event }) => api.events.migrations.RuntimeUpgradeCompleted.is(event) From 3e4bedd09a042ddf607f73f15431eaa3b78e44c8 Mon Sep 17 00:00:00 2001 From: Rodrigo Quelhas Date: Tue, 10 Dec 2024 15:39:13 +0000 Subject: [PATCH 5/5] disabled frontier block synchronization when in lazy loading --- node/service/src/lazy_loading/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/node/service/src/lazy_loading/mod.rs b/node/service/src/lazy_loading/mod.rs index a423a1b19c..ba3c992d23 100644 --- a/node/service/src/lazy_loading/mod.rs +++ b/node/service/src/lazy_loading/mod.rs @@ -694,6 +694,7 @@ where > = Default::default(); let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); + /* TODO: only enable this when frontier backend is compatible with lazy-loading rpc::spawn_essential_tasks( rpc::SpawnTasksParams { task_manager: &task_manager, @@ -708,6 +709,8 @@ where sync_service.clone(), pubsub_notification_sinks.clone(), ); + */ + let ethapi_cmd = rpc_config.ethapi.clone(); let tracing_requesters = if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {