Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy loading improvements #3066

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions node/service/src/lazy_loading/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ use sp_state_machine::{
};
use std::future::Future;
use std::marker::PhantomData;
use std::ops::AddAssign;
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
ptr,
sync::Arc,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

use sc_client_api::{
Expand Down Expand Up @@ -1457,7 +1459,7 @@ pub struct RPC {
http_client: HttpClient,
delay_between_requests_ms: u32,
max_retries_per_request: u32,
counter: Arc<ReadWriteLock<u64>>,
counter: Arc<AtomicU64>,
}

impl RPC {
Expand Down Expand Up @@ -1652,17 +1654,19 @@ impl RPC {
{
use tokio::runtime::Handle;

let id = self.counter.fetch_add(1, Ordering::SeqCst);
let start = std::time::Instant::now();

tokio::task::block_in_place(move || {
Handle::current().block_on(async move {
let delay_between_requests =
Duration::from_millis(self.delay_between_requests_ms.into());

let start = std::time::Instant::now();
self.counter.write().add_assign(1);
let start_req = std::time::Instant::now();
log::debug!(
target: super::LAZY_LOADING_LOG_TARGET,
"Sending request: {}",
self.counter.read()
id
);

// Explicit request delay, to avoid getting 429 errors
Expand All @@ -1676,10 +1680,11 @@ impl RPC {

log::debug!(
target: super::LAZY_LOADING_LOG_TARGET,
"Completed request (id: {}, successful: {}, elapsed_time: {:?})",
self.counter.read(),
"Completed request (id: {}, successful: {}, elapsed_time: {:?}, query_time: {:?})",
id,
result.is_ok(),
start.elapsed()
start.elapsed(),
start_req.elapsed()
);

result
Expand Down
213 changes: 213 additions & 0 deletions node/service/src/lazy_loading/manual_sealing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2024 Moonbeam foundation
// This file is part of Moonbeam.

// Moonbeam is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Moonbeam is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Moonbeam. If not, see <http://www.gnu.org/licenses/>.

use cumulus_primitives_core::BlockT;
use frame_benchmarking::__private::codec;
use futures::{Stream, StreamExt, TryFutureExt};
use sc_client_api::backend::Backend as ClientBackend;
use sc_client_api::Finalizer;
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult, StateAction};
use sc_consensus_manual_seal::{
finalize_block, rpc, CreatedBlock, EngineCommand, Error, FinalizeBlockParams, ManualSealParams,
SealBlockParams, MANUAL_SEAL_ENGINE_ID,
};
use sc_transaction_pool_api::TransactionPool;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus::{BlockOrigin, Environment, Proposer, SelectChain};
use sp_inherents::{CreateInherentDataProviders, InherentDataProvider};
use sp_runtime::traits::Header;
use std::marker::PhantomData;
use std::time::Duration;

pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP, P>(
ManualSealParams {
mut block_import,
mut env,
client,
pool,
mut commands_stream,
select_chain,
consensus_data_provider,
create_inherent_data_providers,
}: ManualSealParams<B, BI, E, C, TP, SC, CS, CIDP, P>,
) where
B: BlockT + 'static,
BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
CB: ClientBackend<B> + 'static,
E: Environment<B> + 'static,
E::Proposer: Proposer<B, Proof = P>,
CS: Stream<Item = EngineCommand<<B as BlockT>::Hash>> + Unpin + 'static,
SC: SelectChain<B> + 'static,
TP: TransactionPool<Block = B>,
CIDP: CreateInherentDataProviders<B, ()>,
P: codec::Encode + Send + Sync + 'static,
{
while let Some(command) = commands_stream.next().await {
match command {
EngineCommand::SealNewBlock {
create_empty,
finalize,
parent_hash,
sender,
} => {
seal_block(SealBlockParams {
sender,
parent_hash,
finalize,
create_empty,
env: &mut env,
select_chain: &select_chain,
block_import: &mut block_import,
consensus_data_provider: consensus_data_provider.as_deref(),
pool: pool.clone(),
client: client.clone(),
create_inherent_data_providers: &create_inherent_data_providers,
})
.await;
}
EngineCommand::FinalizeBlock {
hash,
sender,
justification,
} => {
let justification = justification.map(|j| (MANUAL_SEAL_ENGINE_ID, j));
finalize_block(FinalizeBlockParams {
hash,
sender,
justification,
finalizer: client.clone(),
_phantom: PhantomData,
})
.await
}
}
}
}

/// max duration for creating a proposal in secs
pub const MAX_PROPOSAL_DURATION: u64 = 60;

/// seals a new block with the given params
pub async fn seal_block<B, BI, SC, C, E, TP, CIDP, P>(
SealBlockParams {
create_empty,
finalize,
pool,
parent_hash,
client,
select_chain,
block_import,
env,
create_inherent_data_providers,
consensus_data_provider: digest_provider,
mut sender,
}: SealBlockParams<'_, B, BI, SC, C, E, TP, CIDP, P>,
) where
B: BlockT,
BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
C: HeaderBackend<B> + ProvideRuntimeApi<B>,
E: Environment<B>,
E::Proposer: Proposer<B, Proof = P>,
TP: TransactionPool<Block = B>,
SC: SelectChain<B>,
CIDP: CreateInherentDataProviders<B, ()>,
P: codec::Encode + Send + Sync + 'static,
{
let future = async {
if pool.status().ready == 0 && !create_empty {
return Err(Error::EmptyTransactionPool);
}

// get the header to build this new block on.
// use the parent_hash supplied via `EngineCommand`
// or fetch the best_block.
let parent = match parent_hash {
Some(hash) => client
.header(hash)?
.ok_or_else(|| Error::BlockNotFound(format!("{}", hash)))?,
None => select_chain.best_chain().await?,
};

let inherent_data_providers = create_inherent_data_providers
.create_inherent_data_providers(parent.hash(), ())
.await
.map_err(|e| Error::Other(e))?;

let inherent_data = inherent_data_providers.create_inherent_data().await?;

let proposer = env
.init(&parent)
.map_err(|err| Error::StringError(err.to_string()))
.await?;
let inherents_len = inherent_data.len();

let digest = if let Some(digest_provider) = digest_provider {
digest_provider.create_digest(&parent, &inherent_data)?
} else {
Default::default()
};

let proposal = proposer
.propose(
inherent_data.clone(),
digest,
Duration::from_secs(MAX_PROPOSAL_DURATION),
None,
)
.map_err(|err| Error::StringError(err.to_string()))
.await?;

if proposal.block.extrinsics().len() == inherents_len && !create_empty {
return Err(Error::EmptyTransactionPool);
}

let (header, body) = proposal.block.deconstruct();
let proof = proposal.proof;
let proof_size = proof.encoded_size();
let mut params = BlockImportParams::new(BlockOrigin::Own, header.clone());
params.body = Some(body);
params.finalized = finalize;
params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
params.state_action = StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(
proposal.storage_changes,
));

if let Some(digest_provider) = digest_provider {
digest_provider.append_block_import(&parent, &mut params, &inherent_data, proof)?;
}

// Make sure we return the same post-hash that will be calculated when importing the block
// This is important in case the digest_provider added any signature, seal, ect.
let mut post_header = header.clone();
post_header
.digest_mut()
.logs
.extend(params.post_digests.iter().cloned());

match block_import.import_block(params).await? {
ImportResult::Imported(aux) => Ok(CreatedBlock {
hash: <B as BlockT>::Header::hash(&post_header),
aux,
proof_size,
}),
other => Err(other.into()),
}
};

rpc::send_result(&mut sender, future.await)
}
5 changes: 3 additions & 2 deletions node/service/src/lazy_loading/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub mod call_executor;
mod client;
mod helpers;
mod lock;
mod manual_sealing;
mod state_overrides;

pub const LAZY_LOADING_LOG_TARGET: &'static str = "lazy-loading";
Expand Down Expand Up @@ -379,7 +380,7 @@ where
{
use async_io::Timer;
use futures::Stream;
use sc_consensus_manual_seal::{run_manual_seal, EngineCommand, ManualSealParams};
use sc_consensus_manual_seal::{EngineCommand, ManualSealParams};

let sc_service::PartialComponents {
client,
Expand Down Expand Up @@ -572,7 +573,7 @@ where
task_manager.spawn_essential_handle().spawn_blocking(
"authorship_task",
Some("block-authoring"),
run_manual_seal(ManualSealParams {
manual_sealing::run_manual_seal(ManualSealParams {
block_import,
env,
client: client.clone(),
Expand Down
Loading