Skip to content

Commit

Permalink
add periodic broadcaster
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo committed Nov 28, 2024
1 parent 451aa95 commit 23cade0
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 45 additions & 11 deletions bin/odyssey/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@
//! - `min-debug-logs`: Disables all logs below `debug` level.
//! - `min-trace-logs`: Disables all logs below `trace` level.

use alloy_network::EthereumWallet;
use alloy_network::{Ethereum, EthereumWallet, NetworkWallet};
use alloy_primitives::Address;
use alloy_signer_local::PrivateKeySigner;
use clap::Parser;
use eyre::Context;
use odyssey_node::{
broadcaster::periodic_broadcaster,
chainspec::OdysseyChainSpecParser,
node::OdysseyNode,
rpc::{EthApiExt, EthApiOverrideServer},
};
use odyssey_wallet::{OdysseyWallet, OdysseyWalletApiServer};
use odyssey_walltime::{OdysseyWallTime, OdysseyWallTimeRpcApiServer};
use reth_node_builder::{engine_tree_config::TreeConfig, EngineNodeLauncher};
use reth_node_builder::{engine_tree_config::TreeConfig, EngineNodeLauncher, NodeComponents};
use reth_optimism_cli::Cli;
use reth_optimism_node::{args::RollupArgs, node::OpAddOns};
use reth_provider::{providers::BlockchainProvider2, CanonStateSubscriptions};
Expand All @@ -55,22 +56,37 @@ fn main() {

if let Err(err) =
Cli::<OdysseyChainSpecParser, RollupArgs>::parse().run(|builder, rollup_args| async move {
let (wallet, address) = init_sponsor();
let node = builder
.with_types_and_provider::<OdysseyNode, BlockchainProvider2<_>>()
.with_components(OdysseyNode::components(&rollup_args))
.with_add_ons(OpAddOns::new(rollup_args.sequencer_http))
.on_component_initialized(move |ctx| {
if let Some(address) = address {
ctx.task_executor.spawn(async move {
periodic_broadcaster(
address,
ctx.components.pool(),
ctx.components
.network
.transactions_handle()
.await
.expect("transactions_handle should be initialized"),
)
.await
});
}

Ok(())
})
.extend_rpc_modules(move |ctx| {
// override eth namespace
ctx.modules.replace_configured(
EthApiExt::new(ctx.registry.eth_api().clone()).into_rpc(),
)?;

// register odyssey wallet namespace
if let Ok(sk) = std::env::var("EXP1_SK") {
let signer: PrivateKeySigner =
sk.parse().wrap_err("Invalid EXP0001 secret key.")?;
let wallet = EthereumWallet::from(signer);

if let Some(wallet) = wallet {
let raw_delegations = std::env::var("EXP1_WHITELIST")
.wrap_err("No EXP0001 delegations specified")?;
let valid_delegations: Vec<Address> = raw_delegations
Expand All @@ -89,10 +105,6 @@ fn main() {
)
.into_rpc(),
)?;

info!(target: "reth::cli", "EXP0001 wallet configured");
} else {
warn!(target: "reth::cli", "EXP0001 wallet not configured");
}

let walltime = OdysseyWallTime::spawn(ctx.provider().canonical_state_stream());
Expand Down Expand Up @@ -121,3 +133,25 @@ fn main() {
std::process::exit(1);
}
}

fn init_sponsor() -> (Option<EthereumWallet>, Option<Address>) {
let wallet = std::env::var("EXP1_SK")
.ok()
.map(|sk| {
let wallet = sk
.parse::<PrivateKeySigner>()
.map(EthereumWallet::from)
.expect("Invalid EXP0001 secret key.");
info!(target: "reth::cli", "EXP0001 wallet configured");
wallet
})
.or_else(|| {
warn!(target: "reth::cli", "EXP0001 wallet not configured");
None
});

let address =
wallet.as_ref().map(<EthereumWallet as NetworkWallet<Ethereum>>::default_signer_address);

(wallet, address)
}
1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ alloy-rpc-types-eth.workspace = true


serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
eyre.workspace = true
jsonrpsee.workspace = true
Expand Down
30 changes: 30 additions & 0 deletions crates/node/src/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//! Sponsor periodic broadcaster

use alloy_primitives::Address;
use reth_network::transactions::TransactionsHandle;
use reth_transaction_pool::TransactionPool;
use std::time::Duration;

/// Periodically broadcasts sponsored transactions from the transaction pool.
///
/// `p2p` broadcasting can potentially be flaky, and due to the p2p rules, some txs may never make
/// it to the sequencer, this can happen if a message is dropped internally when channel bounds are
/// enforced for example. So, we re-broadcast them every 10 minutes.
pub async fn periodic_broadcaster<P>(
address: Address,
pool: P,
transactions_handle: TransactionsHandle,
) where
P: TransactionPool,
{
let mut interval_timer = tokio::time::interval(Duration::from_secs(60 * 10));

loop {
let transactions =
pool.get_transactions_by_sender(address).into_iter().map(|tx| *tx.hash()).collect();

transactions_handle.propagate_transactions(transactions);

interval_timer.tick().await;
}
}
1 change: 1 addition & 0 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![warn(unused_crate_dependencies)]

pub mod broadcaster;
pub mod chainspec;
pub mod evm;
pub mod node;
Expand Down

0 comments on commit 23cade0

Please sign in to comment.