Skip to content

Commit

Permalink
wip: pevm integration for historical sync till Cancun
Browse files Browse the repository at this point in the history
  • Loading branch information
hai-rise committed Oct 4, 2024
1 parent d72e438 commit 14ad262
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 89 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -406,15 +406,15 @@ reth-trie-db = { path = "crates/trie/db" }
reth-trie-parallel = { path = "crates/trie/parallel" }

# revm
revm = { version = "14.0.3", features = [
revm = { git = "https://github.com/risechain/revm", rev = "53cdccac81148ef5bbdedfc34c7d526a5fb7d494", features = [
"std",
"secp256k1",
"blst",
], default-features = false }
revm-inspectors = "0.8.1"
revm-primitives = { version = "10.0.0", features = [
revm-primitives = { git = "https://github.com/risechain/revm", rev = "53cdccac81148ef5bbdedfc34c7d526a5fb7d494", features = [
"std",
], default-features = false }
revm-inspectors = { git = "https://github.com/risechain/revm-inspectors", rev = "ec5ba7d2ce40a7f7aeae1d869365f2e4b8d8df02" }

# eth
alloy-chains = "0.1.32"
Expand Down
2 changes: 2 additions & 0 deletions crates/ethereum/evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ alloy-primitives.workspace = true
alloy-eips.workspace = true
alloy-sol-types.workspace = true

pevm = { git = "https://github.com/risechain/pevm", rev = "1955ff251c2c1eb38f878e4eb4444edc1df2a4e7" }

[dev-dependencies]
reth-testing-utils.workspace = true
reth-revm = { workspace = true, features = ["test-utils"] }
Expand Down
126 changes: 96 additions & 30 deletions crates/ethereum/evm/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::{
};
use alloc::{boxed::Box, sync::Arc, vec, vec::Vec};
use alloy_primitives::{BlockNumber, U256};
use core::fmt::Display;
use core::{fmt::Display, num::NonZeroUsize};
use pevm::{chain::PevmEthereum, Pevm};
use reth_chainspec::{ChainSpec, EthereumHardforks, MAINNET};
use reth_ethereum_consensus::validate_block_post_execution;
use reth_evm::{
Expand All @@ -27,7 +28,7 @@ use reth_revm::{
Evm,
};
use revm_primitives::{
db::{Database, DatabaseCommit},
db::{DatabaseCommit, DatabaseRef},
BlockEnv, CfgEnvWithHandlerCfg, EnvWithHandlerCfg, ResultAndState,
};

Expand Down Expand Up @@ -63,12 +64,15 @@ where
{
fn eth_executor<DB>(&self, db: DB) -> EthBlockExecutor<EvmConfig, DB>
where
DB: Database<Error: Into<ProviderError>>,
DB: DatabaseRef<Error: Into<ProviderError> + Display> + Send + Sync,
{
EthBlockExecutor::new(
self.chain_spec.clone(),
self.evm_config.clone(),
State::builder().with_database(db).with_bundle_update().without_state_clear().build(),
Pevm::default(),
PevmEthereum::mainnet(),
NonZeroUsize::new(8).unwrap(),
)
}
}
Expand All @@ -77,22 +81,22 @@ impl<EvmConfig> BlockExecutorProvider for EthExecutorProvider<EvmConfig>
where
EvmConfig: ConfigureEvm<Header = Header>,
{
type Executor<DB: Database<Error: Into<ProviderError> + Display>> =
type Executor<DB: DatabaseRef<Error: Into<ProviderError> + Display> + Send + Sync> =
EthBlockExecutor<EvmConfig, DB>;

type BatchExecutor<DB: Database<Error: Into<ProviderError> + Display>> =
type BatchExecutor<DB: DatabaseRef<Error: Into<ProviderError> + Display> + Send + Sync> =
EthBatchExecutor<EvmConfig, DB>;

fn executor<DB>(&self, db: DB) -> Self::Executor<DB>
where
DB: Database<Error: Into<ProviderError> + Display>,
DB: DatabaseRef<Error: Into<ProviderError> + Display> + Send + Sync,
{
self.eth_executor(db)
}

fn batch_executor<DB>(&self, db: DB) -> Self::BatchExecutor<DB>
where
DB: Database<Error: Into<ProviderError> + Display>,
DB: DatabaseRef<Error: Into<ProviderError> + Display> + Send + Sync,
{
let executor = self.eth_executor(db);
EthBatchExecutor { executor, batch_record: BlockBatchRecord::default() }
Expand Down Expand Up @@ -130,14 +134,13 @@ where
///
/// It does __not__ apply post-execution changes that do not require an [EVM](Evm), for that see
/// [`EthBlockExecutor::post_execution`].
fn execute_state_transitions<Ext, DB>(
fn _execute_state_transitions<Ext, DB>(
&self,
block: &BlockWithSenders,
mut evm: Evm<'_, Ext, &mut State<DB>>,
) -> Result<EthExecuteOutput, BlockExecutionError>
) -> Result<(EthExecuteOutput, Vec<ResultAndState>), BlockExecutionError>
where
DB: Database,
DB::Error: Into<ProviderError> + Display,
DB: DatabaseRef<Error: Into<ProviderError> + Display> + Send + Sync,
{
let mut system_caller = SystemCaller::new(&self.evm_config, &self.chain_spec);

Expand All @@ -146,6 +149,7 @@ where
// execute transactions
let mut cumulative_gas_used = 0;
let mut receipts = Vec::with_capacity(block.body.transactions.len());
let mut results = Vec::with_capacity(block.body.transactions.len());
for (sender, transaction) in block.transactions_with_sender() {
// The sum of the transaction’s gas limit, Tg, and the gas utilized in this block prior,
// must be no greater than the block’s gasLimit.
Expand All @@ -155,24 +159,25 @@ where
transaction_gas_limit: transaction.gas_limit(),
block_available_gas,
}
.into())
.into());
}

self.evm_config.fill_tx_env(evm.tx_mut(), transaction, *sender);

// Execute transaction.
let ResultAndState { result, state } = evm.transact().map_err(move |err| {
let result_and_state = evm.transact().map_err(move |err| {
let new_err = err.map_db_err(|e| e.into());
// Ensure hash is calculated for error log, if not already done
BlockValidationError::EVM {
hash: transaction.recalculate_hash(),
error: Box::new(new_err),
}
})?;
evm.db_mut().commit(state);
results.push(result_and_state.clone());
evm.db_mut().commit(result_and_state.state);

// append gas used
cumulative_gas_used += result.gas_used();
cumulative_gas_used += result_and_state.result.gas_used();

// Push transaction changeset and calculate header bloom filter for receipt.
receipts.push(
Expand All @@ -181,10 +186,10 @@ where
tx_type: transaction.tx_type(),
// Success flag was added in `EIP-658: Embedding transaction status code in
// receipts`.
success: result.is_success(),
success: result_and_state.result.is_success(),
cumulative_gas_used,
// convert to reth log
logs: result.into_logs(),
logs: result_and_state.result.into_logs(),
..Default::default()
},
);
Expand All @@ -202,7 +207,7 @@ where
vec![]
};

Ok(EthExecuteOutput { receipts, requests, gas_used: cumulative_gas_used })
Ok((EthExecuteOutput { receipts, requests, gas_used: cumulative_gas_used }, results))
}
}

Expand All @@ -217,12 +222,29 @@ pub struct EthBlockExecutor<EvmConfig, DB> {
executor: EthEvmExecutor<EvmConfig>,
/// The state to use for execution
state: State<DB>,
/// Parallel executor
pevm: Pevm,
chain: PevmEthereum,
concurrency_level: NonZeroUsize,
}

impl<EvmConfig, DB> EthBlockExecutor<EvmConfig, DB> {
/// Creates a new Ethereum block executor.
pub const fn new(chain_spec: Arc<ChainSpec>, evm_config: EvmConfig, state: State<DB>) -> Self {
Self { executor: EthEvmExecutor { chain_spec, evm_config }, state }
pub const fn new(
chain_spec: Arc<ChainSpec>,
evm_config: EvmConfig,
state: State<DB>,
pevm: Pevm,
chain: PevmEthereum,
concurrency_level: NonZeroUsize,
) -> Self {
Self {
executor: EthEvmExecutor { chain_spec, evm_config },
state,
pevm,
chain,
concurrency_level,
}
}

#[inline]
Expand All @@ -240,14 +262,14 @@ impl<EvmConfig, DB> EthBlockExecutor<EvmConfig, DB> {
impl<EvmConfig, DB> EthBlockExecutor<EvmConfig, DB>
where
EvmConfig: ConfigureEvm<Header = Header>,
DB: Database<Error: Into<ProviderError> + Display>,
DB: DatabaseRef<Error: Into<ProviderError> + Display> + Send + Sync,
{
/// Configures a new evm configuration and block environment for the given block.
///
/// # Caution
///
/// This does not initialize the tx environment.
fn evm_env_for_block(&self, header: &Header, total_difficulty: U256) -> EnvWithHandlerCfg {
fn _evm_env_for_block(&self, header: &Header, total_difficulty: U256) -> EnvWithHandlerCfg {
let mut cfg = CfgEnvWithHandlerCfg::new(Default::default(), Default::default());
let mut block_env = BlockEnv::default();
self.executor.evm_config.fill_cfg_and_block_env(
Expand Down Expand Up @@ -275,16 +297,60 @@ where
self.on_new_block(&block.header);

// 2. configure the evm and execute
let env = self.evm_env_for_block(&block.header, total_difficulty);
let output = {
let evm = self.executor.evm_config.evm_with_env(&mut self.state, env);
self.executor.execute_state_transitions(block, evm)
}?;
let spec_id = crate::config::revm_spec(
self.chain_spec(),
&reth_chainspec::Head {
number: block.header.number,
timestamp: block.header.timestamp,
difficulty: block.header.difficulty,
total_difficulty,
hash: Default::default(),
},
);
let mut block_env = BlockEnv::default();
self.executor.evm_config.fill_block_env(
&mut block_env,
&block.header,
spec_id >= revm_primitives::SpecId::MERGE,
);
let mut tx_envs = Vec::with_capacity(block.body.transactions.len());
for (sender, transaction) in block.transactions_with_sender() {
let mut tx_env = revm_primitives::TxEnv::default();
self.executor.evm_config.fill_tx_env(&mut tx_env, transaction, *sender);
tx_envs.push(tx_env);
}
let results =
if tx_envs.len() < self.concurrency_level.into() || block.header.gas_used < 4_000_000 {
pevm::execute_revm_sequential(&self.state, &self.chain, spec_id, block_env, tx_envs)
} else {
self.pevm.execute_revm_parallel(
&self.state,
&self.chain,
spec_id,
block_env,
tx_envs,
self.concurrency_level,
)
}
.unwrap_or_else(|err| panic!("{:?}", err));
let mut cumulative_gas_used = 0;
let mut receipts = Vec::with_capacity(block.body.transactions.len());
for (tx, ResultAndState { result, state }) in block.body.transactions().zip(results) {
cumulative_gas_used += result.gas_used();
receipts.push(Receipt {
tx_type: tx.tx_type(),
success: result.is_success(),
cumulative_gas_used,
logs: result.into_logs(),
..Default::default()
});
self.state.commit(state);
}

// 3. apply post execution changes
self.post_execution(block, total_difficulty)?;

Ok(output)
Ok(EthExecuteOutput { receipts, requests: Vec::new(), gas_used: cumulative_gas_used })
}

/// Apply settings before a new block is executed.
Expand Down Expand Up @@ -329,7 +395,7 @@ where
impl<EvmConfig, DB> Executor<DB> for EthBlockExecutor<EvmConfig, DB>
where
EvmConfig: ConfigureEvm<Header = Header>,
DB: Database<Error: Into<ProviderError> + Display>,
DB: DatabaseRef<Error: Into<ProviderError> + Display> + Send + Sync,
{
type Input<'a> = BlockExecutionInput<'a, BlockWithSenders>;
type Output = BlockExecutionOutput<Receipt>;
Expand Down Expand Up @@ -393,7 +459,7 @@ impl<EvmConfig, DB> EthBatchExecutor<EvmConfig, DB> {
impl<EvmConfig, DB> BatchExecutor<DB> for EthBatchExecutor<EvmConfig, DB>
where
EvmConfig: ConfigureEvm<Header = Header>,
DB: Database<Error: Into<ProviderError> + Display>,
DB: DatabaseRef<Error: Into<ProviderError> + Display> + Send + Sync,
{
type Input<'a> = BlockExecutionInput<'a, BlockWithSenders>;
type Output = ExecutionOutcome;
Expand Down
14 changes: 7 additions & 7 deletions crates/ethereum/payload/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,12 @@ where
// which also removes all dependent transaction from the iterator before we can
// continue
best_txs.mark_invalid(&pool_tx);
continue
continue;
}

// check if the job was cancelled, if so we can exit early
if cancel.is_cancelled() {
return Ok(BuildOutcome::Cancelled)
return Ok(BuildOutcome::Cancelled);
}

// convert tx to a signed transaction
Expand All @@ -221,7 +221,7 @@ where
// for regular transactions above.
trace!(target: "payload_builder", tx=?tx.hash, ?sum_blob_gas_used, ?tx_blob_gas, "skipping blob transaction because it would exceed the max data gas per block");
best_txs.mark_invalid(&pool_tx);
continue
continue;
}
}

Expand Down Expand Up @@ -249,11 +249,11 @@ where
best_txs.mark_invalid(&pool_tx);
}

continue
continue;
}
err => {
// this is an error that we should treat as fatal for this attempt
return Err(PayloadBuilderError::EvmExecutionError(err))
return Err(PayloadBuilderError::EvmExecutionError(err));
}
}
}
Expand Down Expand Up @@ -303,7 +303,7 @@ where
// check if we have a better block
if !is_better_payload(best_payload.as_ref(), total_fees) {
// can skip building the block
return Ok(BuildOutcome::Aborted { fees: total_fees, cached_reads })
return Ok(BuildOutcome::Aborted { fees: total_fees, cached_reads });
}

// calculate the requests and the requests root
Expand Down Expand Up @@ -354,7 +354,7 @@ where
// calculate the state root
let hashed_state = HashedPostState::from_bundle_state(&execution_outcome.state().state);
let (state_root, trie_output) = {
let state_provider = db.database.0.inner.borrow_mut();
let state_provider = db.database.inner.borrow_mut();
state_provider.db.state_root_with_updates(hashed_state.clone()).inspect_err(|err| {
warn!(target: "payload_builder",
parent_hash=%parent_block.hash(),
Expand Down
Loading

0 comments on commit 14ad262

Please sign in to comment.