diff --git a/Cargo.lock b/Cargo.lock index e811f2b..b8a93b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4625,6 +4625,7 @@ dependencies = [ "revm-precompile", "revm-primitives", "serde_json", + "tokio", "tracing", ] diff --git a/bin/odyssey/src/main.rs b/bin/odyssey/src/main.rs index 0f9755f..cdd770b 100644 --- a/bin/odyssey/src/main.rs +++ b/bin/odyssey/src/main.rs @@ -23,19 +23,20 @@ //! - `min-debug-logs`: Disables all logs below `debug` level. //! - `min-trace-logs`: Disables all logs below `trace` level. -use alloy_network::EthereumWallet; +use alloy_network::{Ethereum, EthereumWallet, NetworkWallet}; use alloy_primitives::Address; use alloy_signer_local::PrivateKeySigner; use clap::Parser; use eyre::Context; use odyssey_node::{ + broadcaster::periodic_broadcaster, chainspec::OdysseyChainSpecParser, node::OdysseyNode, rpc::{EthApiExt, EthApiOverrideServer}, }; use odyssey_wallet::{OdysseyWallet, OdysseyWalletApiServer}; use odyssey_walltime::{OdysseyWallTime, OdysseyWallTimeRpcApiServer}; -use reth_node_builder::{engine_tree_config::TreeConfig, EngineNodeLauncher}; +use reth_node_builder::{engine_tree_config::TreeConfig, EngineNodeLauncher, NodeComponents}; use reth_optimism_cli::Cli; use reth_optimism_node::{args::RollupArgs, node::OpAddOns}; use reth_provider::{providers::BlockchainProvider2, CanonStateSubscriptions}; @@ -55,10 +56,29 @@ fn main() { if let Err(err) = Cli::::parse().run(|builder, rollup_args| async move { + let (wallet, address) = init_sponsor(); let node = builder .with_types_and_provider::>() .with_components(OdysseyNode::components(&rollup_args)) .with_add_ons(OpAddOns::new(rollup_args.sequencer_http)) + .on_component_initialized(move |ctx| { + if let Some(address) = address { + ctx.task_executor.spawn(async move { + periodic_broadcaster( + address, + ctx.components.pool(), + ctx.components + .network + .transactions_handle() + .await + .expect("transactions_handle should be initialized"), + ) + .await + }); + } + + Ok(()) + }) .extend_rpc_modules(move |ctx| { // override eth namespace ctx.modules.replace_configured( @@ -66,11 +86,7 @@ fn main() { )?; // register odyssey wallet namespace - if let Ok(sk) = std::env::var("EXP1_SK") { - let signer: PrivateKeySigner = - sk.parse().wrap_err("Invalid EXP0001 secret key.")?; - let wallet = EthereumWallet::from(signer); - + if let Some(wallet) = wallet { let raw_delegations = std::env::var("EXP1_WHITELIST") .wrap_err("No EXP0001 delegations specified")?; let valid_delegations: Vec
= raw_delegations @@ -89,10 +105,6 @@ fn main() { ) .into_rpc(), )?; - - info!(target: "reth::cli", "EXP0001 wallet configured"); - } else { - warn!(target: "reth::cli", "EXP0001 wallet not configured"); } let walltime = OdysseyWallTime::spawn(ctx.provider().canonical_state_stream()); @@ -121,3 +133,25 @@ fn main() { std::process::exit(1); } } + +fn init_sponsor() -> (Option, Option
) { + let wallet = std::env::var("EXP1_SK") + .ok() + .map(|sk| { + let wallet = sk + .parse::() + .map(EthereumWallet::from) + .expect("Invalid EXP0001 secret key."); + info!(target: "reth::cli", "EXP0001 wallet configured"); + wallet + }) + .or_else(|| { + warn!(target: "reth::cli", "EXP0001 wallet not configured"); + None + }); + + let address = + wallet.as_ref().map(>::default_signer_address); + + (wallet, address) +} diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index c631739..a4b339c 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -45,6 +45,7 @@ alloy-rpc-types-eth.workspace = true serde_json.workspace = true +tokio.workspace = true tracing.workspace = true eyre.workspace = true jsonrpsee.workspace = true diff --git a/crates/node/src/broadcaster.rs b/crates/node/src/broadcaster.rs new file mode 100644 index 0000000..f23ae2f --- /dev/null +++ b/crates/node/src/broadcaster.rs @@ -0,0 +1,30 @@ +//! Sponsor periodic broadcaster + +use alloy_primitives::Address; +use reth_network::transactions::TransactionsHandle; +use reth_transaction_pool::TransactionPool; +use std::time::Duration; + +/// Periodically broadcasts sponsored transactions from the transaction pool. +/// +/// `p2p` broadcasting can potentially be flaky, and due to the p2p rules, some txs may never make +/// it to the sequencer, this can happen if a message is dropped internally when channel bounds are +/// enforced for example. So, we re-broadcast them every 10 minutes. +pub async fn periodic_broadcaster

( + address: Address, + pool: P, + transactions_handle: TransactionsHandle, +) where + P: TransactionPool, +{ + let mut interval_timer = tokio::time::interval(Duration::from_secs(60 * 10)); + + loop { + let transactions = + pool.get_transactions_by_sender(address).into_iter().map(|tx| *tx.hash()).collect(); + + transactions_handle.propagate_transactions(transactions); + + interval_timer.tick().await; + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 6966b27..c10c76c 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -15,6 +15,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![warn(unused_crate_dependencies)] +pub mod broadcaster; pub mod chainspec; pub mod evm; pub mod node;